Datasets
Run Conversation Model On Dataset
Datasets
Run Conversation Model On Dataset
Use the Collinear AI API to run conversation model on your dataset.
Step 1: Create Inference Function
The following Python code demonstrates how to create a function that runs inference on a dataset.
import pandas as pd
from weaver.inference.clients import OpenAIConversationClient
from weaver.types import LimiterConfig, ConversationMessage
from tqdm import tqdm
import asyncio
def run_inference_on_dataset(data: pd.DataFrame, model: str, api_url: str | None, api_key: str | None,
system_prompt: str | None,
limiter_config: LimiterConfig | None):
if api_url is None:
api_url = 'https://api.openai.com/v1'
if limiter_config is None:
limiter_config = LimiterConfig(limit=1, time_period=1)
client = OpenAIConversationClient(model=model, api_url=api_url, api_key=api_key, system_prompt=system_prompt,
limiter=limiter_config)
pbar = tqdm(total=len(data))
async def generate(example):
messages = [ConversationMessage(**m) for m in example["conv_prefix"]]
response = await client.generate(messages)
if not response:
example["response"] = "Model Failure",
else:
example["response"] = response[0]
pbar.update(1)
return example
async def run():
tasks = []
for idx, row in data.iterrows():
tasks.append(asyncio.create_task(generate(row)))
results = await asyncio.gather(*tasks)
return results
if asyncio.get_event_loop().is_running():
outputs = asyncio.ensure_future(run())
return outputs
else:
outputs = asyncio.run(run())
return outputs
Step 2: Wrapper function for running the inference on the dataset
The following Python code demonstrates how to create a task that
- Fetches the dataset annotations by dataset name
- Executes the plan steps in order
- Uploads the results to a new dataset
class Task:
def __init__(self, space_id, dataset_name, plan, upload_name):
self.dataset_name = dataset_name
self.plan = plan
self.upload_name = upload_name
self.space_id = space_id
self.dataset_id=''
def get_dataset_annotations_by_dataset(self):
url = f'{host}/api/v1/dataset/name'
output = requests.post(url, json={"space_id": self.space_id,
"dataset_name": self.dataset_name},
headers={
'Authorization': f'Bearer {token}'})
response = output.json()
self.dataset_id = response['dataset_id']
dataset_annotations = response['dataset_annotations']
return pd.DataFrame(dataset_annotations)
def upload_new_dataset(self, df: pd.DataFrame, ):
req_obj = {
"name": self.upload_name,
"space_id": self.space_id,
'parent_dataset_id': self.dataset_id
}
conversations = []
df = pd.DataFrame(df)
for index, row in df.iterrows():
conversations.append({
'conv_prefix': row['conv_prefix'],
'response': row['response'],
'judgements': {}
})
req_obj['conversations'] = conversations
url = f'{host}/api/v1/dataset'
output = requests.post(url, json=req_obj,
headers={
'Authorization': f'Bearer {token}'})
response = output.json()
return response
async def execute(self):
# Execute the plan steps in order
data = self.get_dataset_annotations_by_dataset() # Fetch dataset
for step in self.plan:
if isinstance(step, tuple):
func, params = step
data = await func(data, **params)
else:
data = await step(data)
self.upload_new_dataset(data)
# Example task definition
def example_eval():
return Task(
space_id=space_id,
dataset_name='base_dataset',
plan=[
(run_inference_on_dataset, {
'model': 'gpt-4o-mini',
'api_key': 'OPENAI_API_KEY',
})
],
# Passing dataset and other required parameters to upload_new_dataset
upload_name='v1-ds-gpto-4o-mini',
)
# To execute the task
example_task = example_eval()
await example_task.execute()
You can download the full Jupyter Notebook from here