Use the Collinear AI API to run conversation model on your dataset.
import pandas as pd
from weaver.inference.clients import OpenAIConversationClient
from weaver.types import LimiterConfig, ConversationMessage
from tqdm import tqdm
import asyncio
def run_inference_on_dataset(data: pd.DataFrame, model: str, api_url: str | None, api_key: str | None,
system_prompt: str | None,
limiter_config: LimiterConfig | None):
if api_url is None:
api_url = 'https://api.openai.com/v1'
if limiter_config is None:
limiter_config = LimiterConfig(limit=1, time_period=1)
client = OpenAIConversationClient(model=model, api_url=api_url, api_key=api_key, system_prompt=system_prompt,
limiter=limiter_config)
pbar = tqdm(total=len(data))
async def generate(example):
messages = [ConversationMessage(**m) for m in example["conv_prefix"]]
response = await client.generate(messages)
if not response:
example["response"] = "Model Failure",
else:
example["response"] = response[0]
pbar.update(1)
return example
async def run():
tasks = []
for idx, row in data.iterrows():
tasks.append(asyncio.create_task(generate(row)))
results = await asyncio.gather(*tasks)
return results
if asyncio.get_event_loop().is_running():
outputs = asyncio.ensure_future(run())
return outputs
else:
outputs = asyncio.run(run())
return outputs
class Task:
def __init__(self, space_id, dataset_name, plan, upload_name):
self.dataset_name = dataset_name
self.plan = plan
self.upload_name = upload_name
self.space_id = space_id
self.dataset_id=''
def get_dataset_annotations_by_dataset(self):
url = f'{host}/api/v1/dataset/name'
output = requests.post(url, json={"space_id": self.space_id,
"dataset_name": self.dataset_name},
headers={
'Authorization': f'Bearer {token}'})
response = output.json()
self.dataset_id = response['dataset_id']
dataset_annotations = response['dataset_annotations']
return pd.DataFrame(dataset_annotations)
def upload_new_dataset(self, df: pd.DataFrame, ):
req_obj = {
"name": self.upload_name,
"space_id": self.space_id,
'parent_dataset_id': self.dataset_id
}
conversations = []
df = pd.DataFrame(df)
for index, row in df.iterrows():
conversations.append({
'conv_prefix': row['conv_prefix'],
'response': row['response'],
'judgements': {}
})
req_obj['conversations'] = conversations
url = f'{host}/api/v1/dataset'
output = requests.post(url, json=req_obj,
headers={
'Authorization': f'Bearer {token}'})
response = output.json()
return response
async def execute(self):
# Execute the plan steps in order
data = self.get_dataset_annotations_by_dataset() # Fetch dataset
for step in self.plan:
if isinstance(step, tuple):
func, params = step
data = await func(data, **params)
else:
data = await step(data)
self.upload_new_dataset(data)
# Example task definition
def example_eval():
return Task(
space_id=space_id,
dataset_name='base_dataset',
plan=[
(run_inference_on_dataset, {
'model': 'gpt-4o-mini',
'api_key': 'OPENAI_API_KEY',
})
],
# Passing dataset and other required parameters to upload_new_dataset
upload_name='v1-ds-gpto-4o-mini',
)
# To execute the task
example_task = example_eval()
await example_task.execute()
Was this page helpful?