Step 1: Create Inference Function

The following Python code demonstrates how to create a function that runs inference on a dataset.

import pandas as pd
from weaver.inference.clients import OpenAIConversationClient
from weaver.types import LimiterConfig, ConversationMessage
from tqdm import tqdm
import asyncio
def run_inference_on_dataset(data: pd.DataFrame, model: str, api_url: str | None, api_key: str | None,
                             system_prompt: str | None,
                             limiter_config: LimiterConfig | None):
    if api_url is None:
        api_url = 'https://api.openai.com/v1'
    if limiter_config is None:
        limiter_config = LimiterConfig(limit=1, time_period=1)
    client = OpenAIConversationClient(model=model, api_url=api_url, api_key=api_key, system_prompt=system_prompt,
                                      limiter=limiter_config)
    pbar = tqdm(total=len(data))

    async def generate(example):
        messages = [ConversationMessage(**m) for m in example["conv_prefix"]]
        response = await client.generate(messages)
        if not response:
            example["response"] = "Model Failure",
        else:
            example["response"] = response[0]
        pbar.update(1)
        return example

    async def run():
        tasks = []
        for idx, row in data.iterrows():
            tasks.append(asyncio.create_task(generate(row)))
        results = await asyncio.gather(*tasks)
        return results

    if asyncio.get_event_loop().is_running():
        outputs = asyncio.ensure_future(run())
        return outputs
    else:
        outputs = asyncio.run(run())

    return outputs

Step 2: Wrapper function for running the inference on the dataset

The following Python code demonstrates how to create a task that

  1. Fetches the dataset annotations by dataset name
  2. Executes the plan steps in order
  3. Uploads the results to a new dataset

class Task:
    def __init__(self, space_id, dataset_name, plan, upload_name):
        self.dataset_name = dataset_name
        self.plan = plan
        self.upload_name = upload_name
        self.space_id = space_id
        self.dataset_id=''

    def get_dataset_annotations_by_dataset(self):
        url = f'{host}/api/v1/dataset/name'
        output = requests.post(url, json={"space_id": self.space_id,
                                          "dataset_name": self.dataset_name},
                               headers={
                                   'Authorization': f'Bearer {token}'})
        response = output.json()
        self.dataset_id = response['dataset_id']
        dataset_annotations = response['dataset_annotations']
        return pd.DataFrame(dataset_annotations)

    def upload_new_dataset(self, df: pd.DataFrame, ):
        req_obj = {
            "name": self.upload_name,
            "space_id": self.space_id,
            'parent_dataset_id': self.dataset_id
        }
        conversations = []
        df = pd.DataFrame(df)
        for index, row in df.iterrows():
            conversations.append({
                'conv_prefix': row['conv_prefix'],
                'response': row['response'],
                'judgements': {}
            })
        req_obj['conversations'] = conversations

        url = f'{host}/api/v1/dataset'
        output = requests.post(url, json=req_obj,
                               headers={
                                   'Authorization': f'Bearer {token}'})
        response = output.json()
        return response

    async def execute(self):
        # Execute the plan steps in order
        data = self.get_dataset_annotations_by_dataset()  # Fetch dataset
        for step in self.plan:
            if isinstance(step, tuple):
                func, params = step

                data = await func(data, **params)

            else:
                data = await step(data)
        self.upload_new_dataset(data)


# Example task definition
def example_eval():
    return Task(
        space_id=space_id,
        dataset_name='base_dataset',
        plan=[
            (run_inference_on_dataset, {
                'model': 'gpt-4o-mini',
                'api_key': 'OPENAI_API_KEY',
            })
        ],
        # Passing dataset and other required parameters to upload_new_dataset
        upload_name='v1-ds-gpto-4o-mini',
    )


# To execute the task
example_task = example_eval()
await example_task.execute()

You can download the full Jupyter Notebook from here