Auto-generate MongoDB Queries with OpenAI Assistants API

Photo by Growtika on Unsplash

This is a continuation to the previous article on generating MongoDB Queries using OpenAI Chat Completion API. In this article, we will look at how to construct MongoDB Queries using OpenAI Assistants API given an input and an expected output.

In this article, we will cover the following:

  • Create an assistant using Assistants API and use it to generate MongoDB queries.
  • Use custom function tool to format the output in JSON format.
  • Use custom function tool to verify that the generated query is accurate. We will mock up the verification step to keep the example simple.

Please refer to the Code section at the end of the article for the link to the repo containing the Jupyter Notebook used in this article.

Create a MongoDB assistant

Let’s start with creating an assistant using the Assistants API and use it to generate MongoDB queries for a given input data and expected output.

Data

Below is the input data and the expected output data that we will use for testing throughout this example.

input_data = """
[
  { "name": "Sachin", "team": "India" },
  { "name": "Sourav", "team": "India" },
  { "name": "Lara", "team": "West Indies" }
]
"""

output_data = """
[
 { "team": "India", "playerCount": 2 },
 { "team": "West Indies", "playerCount": 1 }
]
"""

Basic Setup

Install and import the packages and then instantiate the OpenAI client. The OpenAI API key is stored in an .env file and it is loaded as an environment variable.

!pip install openai
!pip install python-dotenv

from openai import OpenAI
import os
from dotenv import load_dotenv
import time
import json5
import json

_ = load_dotenv()
openai_client = OpenAI()

GPT3_MODEL = "gpt-3.5-turbo-1106"
GPT4_MODEL = "gpt-4-1106-preview"

Prompts

The assistant_instructions defines what the assistant is all about and it’s characteristics.

The user_prompt is used for creating user messages by passing in the input and expected output data from the user and then make it part of a thread.

A thread in Assistants API reflects a single user session comprising of the messages between the user and the AI assistant.

assistant_instructions = """You are a MongoDB expert 
with great expertise in writing MongoDB queries for any given data 
to produce an expected output."""

user_prompt = f"""Your task is to write a MongoDB Query, 
specifically an aggregation pipeline that would produce 
the expected output for the given input.

You will always return a JSON response with the following fields.
```
mongoDBQuery: The MongoDB aggregation pipeline to produce 
the expected output for a given input. This field corresponds 
to just the list of stages in the aggregation pipeline 
and shouldn't contain the "db.collection.aggregate" prefix.
    
queryExplanation: A detailed explanation for the query 
that was returned.
```
    
Input data: {input_data} 
Expected output data: {output_data}
"""

Create an assistant

Let’s create an assistant named “MongoDB assistant” by passing in the assistant_instructions defined above. Choose to pass either GPT 3.5 or GPT 4 model.

We will also store the assistant’s ID for future use in the example, allowing us to update it with custom tools

assistant = openai_client.beta.assistants.create(
    name="MongoDB assistant",
    instructions=assistant_instructions,
    model=GPT3_MODEL
)

MONGO_DB_ASSISTANT_ID = assistant.id

Create a thread and add messages

Now, we will create a thread to start a user conversation and add a single message to the thread using the user_prompt defined above.

thread = openai_client.beta.threads.create()

message = openai_client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content=user_prompt
)

Invoke the assistant

In the Assistants API, we need to create a run object to invoke or execute the assistant by passing in the thread and the assistant id.

run = openai_client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant.id
)

This is an asynchronous invocation and the run object will go through a sequence of statuses, starting with “queued,” to “in-progress” and finally to “completed” status. We will need to poll at regular intervals to identify when the status of the run object turns to “completed”. Once the run is in “completed” status, we can retrieve the messages from the thread to get the assistant’s response.

In the below code, we poll for a total of 2 minutes for every 2 seconds. In each iteration, we retrieve the latest run object and check if the status is “completed”. If it is completed, we retrieve the data from the first message which would be the response from the assistant.

for i in range(60):
    try:
        updated_run = openai_client.beta.threads.runs.retrieve(
          thread_id=thread.id,
          run_id=run.id
        )
        # Check if the status indicates completion
        if updated_run.status == "completed":
            messages = openai_client.beta.threads.messages.list(
                thread_id=thread.id
            )
            print(f"Assistants Response: {messages.data[0].content[0].text.value}")
            break
    except Exception as e:
        print(f"Error: {str(e)}. Trying again...")
		finally:
				time.sleep(2)
else:        
    # If max_attempts reached without completion, then assistant call timed out
    print("Timeout: Assistant didn't respond in time. Please try again.")

Output

Below is a sample output from the assistant providing us with the MongoDB query for the given input and expected output data along with the explanation for the query.

Assistants Response: Here's the MongoDB aggregation query for the given input:

```json
[
  {
    $group: {
      _id: "$team",
      playerCount: { $sum: 1 }
    }
  },
  {
    $project: {
      team: "$_id",
      playerCount: 1,
      _id: 0
    }
  }
]
```

Explanation:
1. `$group` stage: Groups the input documents by the "team" field and calculates the player count for each group using the `$sum` accumulator.
2. `$project` stage: Reshapes the output to rename the `_id` field as "team" and includes the "playerCount" field while excluding the default `_id` field.

Format the assistant response using function calling

If we were to use the output from the assistant in downstream systems or use it to display in the UI or make it part of an API response, we would need the response in a reliable format, typically adhering to a JSON schema.

Since Assistants API does not support JSON response format like the Chat Completion API, we will use a custom function tool and use the function arguments generated by the assistant to retrieve the response from the assistant in an expected format.

Update the assistant using custom function tool

Define a custom function and pass it as a tool to the assistant. This function will have two parameters – mongoDBQuery and queryExplanation – the ones that we need from the assistant in the response.

assistant = openai_client.beta.assistants.update(
    MONGO_DB_ASSISTANT_ID,
    tools=[
        {
            "type": "function",
            "function": {
                "name": "formatResponse",
                "description": """Format the assistant's response 
                                  before responding to user""",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "mongoDBQuery": {
                            "type": "string",
                            "description": """The MongoDB aggregation pipeline
                            to produce the expected output for a given input.
                            This field corresponds to just the list of stages 
                            in the aggregation pipeline and shouldn't contain 
                            the 'db.collection.aggregate' prefix."""
                        },
                        "queryExplanation": {
                            "type": "string",
                            "description": "A detailed explanation for \
                                            the query that was returned."
                        } 
                    },
                    "required": ["mongoDBQuery", "queryExplanation"]
                }
            }
        }
    ]
)

User Prompt

Now that we have created the assistant with the custom function tool, we need to instruct it to invoke the function always, so that we can get a formatted response. So, we will modify our previous user prompt as follows.

We will mark the instruction “Important”, so that the assistant honors our request and always decides to invoke the function.

user_prompt = f"""Your task is to write a MongoDB Query, 
specifically an aggregation pipeline that would produce 
the expected output for the given input.

Important: You will always format the response using the 
formatResponse tool before responding to user. 
    
Input data: {input_data} 
Expected output data: {output_data}
"""

Create a thread, add the message and run the assistant

Let’s create a thread, add the message and run the assistant just like we did last time.

thread = openai_client.beta.threads.create()

message = openai_client.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content=user_prompt
)

run = openai_client.beta.threads.runs.create(
    thread_id=thread.id,
    assistant_id=assistant.id
)

Retrieve the assistant response

This time since we have asked the assistant to call the function always, we will look for “requires_action” status on the run object as that is the status the assistant sets when it decides to use one of its tools, in this case a custom function.

We will retrieve the function arguments from the tool_calls array. Since we have just one tool here, we can safely get it from the starting index in the array.

Note: The reason to use JSON5 library to parse is that sometimes in the assistant response, for the aggregation pipeline stages, the quotes are missing around the stage names, thus making it difficult to parse it to a Python dict. JSON5 is helpful in resolving that issue and properly parsing it even when the quotes are missing.

for i in range(60):
    try:      
        updated_run = openai_client.beta.threads.runs.retrieve(
          thread_id=thread.id,
          run_id=run.id
        )
        # Check if the status indicates completion
        if updated_run.status == "requires_action":
            assitant_response = json5.loads(updated_run.required_action\
                                            .submit_tool_outputs\
                                            .tool_calls[0].function.arguments)
            print(f"Assistant Response:\n{assitant_response}")
            break
    except Exception as e:
        print(f"Error: {str(e)}. Trying again...")
    finally:
        time.sleep(2)
else:        
    # If max_attempts reached without completion, then assistant call timed out
    print("Timeout: Assistant didn't respond in time. Please try again.")

Output

Below is the formatted output from the assistant’s response which is in an expected variable and a standard JSON schema containing two fields – mongoDBQuery and queryExplanation.

Assistant Response:
{'mongoDBQuery': '[{"$group":{"_id":"$team","playerCount":{"$sum":1}}},
{"$project":{"team":"$_id","playerCount":1,"_id":0}}]', 
'queryExplanation': "This query uses the $group stage to group the 
documents by the 'team' field, and then uses the $project stage to 
reshape the output to include the 'team' and 'playerCount' fields 
while excluding the _id field."}

Verify that the generated query is accurate

Now that we have the output in a dependable format, we can take this example further and instruct the assistant to verify the generated query by executing it on a MongoDB instance. This would provide a way for the assistant to check whether the resulting output matches with the user-provided expected output. In cases where the output doesn’t match with the expected output, the assistant can iteratively refine the query and verify it until an accurate match is achieved.

To keep the example simple, we will mock this query verification and return a hardcoded response. If you would like to see an actual implementation of the verification process against a MongoDB instance, please check out the previous blog post for details.

Create a new custom function

We will create a new custom function and add it as a tool to the assistant for this query verification step. This function interface is same as the one that we used for formatting the response but with a different name and description.

execute_query_function = {
    "name": "executeQuery",
    "description": """Execute the MongoDB Query on the given input data 
                      to verify the output""",
    "parameters": {
        "type": "object",
        "properties": {
            "mongoDBQuery": {
                "type": "string",
                "description": """The MongoDB aggregation pipeline to 
                                  produce the expected output 
                                  for a given input. This field 
                                  corresponds to just the list of 
                                  stages in the aggregation pipeline 
                                  and shouldn't contain the 
                                  'db.collection.aggregate' prefix."""
            },
            "queryExplanation": {
                "type": "string",
                "description": """A detailed explanation for the query 
                                that was returned."""
            } 
        },
        "required": ["mongoDBQuery", "queryExplanation"]
    }
}

Update the assistant with the new custom function

Let’s update the assistant to add this new custom function as a tool in the tools array.

assistant = openai_client.beta.assistants.update(
    MONGO_DB_ASSISTANT_ID,
    tools = [
        {
            "type": "function", 
            "function": execute_query_function
        }
    ]
)

Utility functions

There are a couple of utility functions defined which will be used to manage the back and forth conversation with the assistant.

  • The process_user_input function will be used to manage the user conversation with the assistant by creating a thread, adding messages to it and invoking the assistant for that thread.
  • The get_completed_run function operates akin to an asynchronous wait function. It awaits and returns the run object when its status changes to ‘completed‘ or ‘requires_action.’ This is achieved through periodic polling, fetching the latest run object, and checking its status.
  • The execute_query function is the actual function that will be invoked when the assistant decides to use the custom function tool – executeQuery. In this implementation we are returning just a mock response without actually executing the query.
def process_user_input(user_input):
    #Create a new thread
    thread = openai_client.beta.threads.create()
    
    #Add a message with the user query to the thread
    message = openai_client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=user_prompt
    )
    
    #Create a run to invoke the assistant
    run = openai_client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id
    )
    return thread, run

def get_completed_run(thread, run, max_attempts=60, sleep_interval=2):
    for i in range(max_attempts):
        try:
            run = openai_client.beta.threads.runs.retrieve(
              thread_id=thread.id,
              run_id=run.id
            )
            # Check if the status indicates completion
            if run.status == "completed" or run.status == "requires_action":
                return run
        except Exception as e:
            print(f"Error: {str(e)}. Trying again...")
        finally:
            time.sleep(sleep_interval)
    else:        
        # If max_attempts reached without completion, then assistant timed out
        return None

# Custom function tool
def execute_query(mongoDBQuery):
    return "success"

User Prompt

The user prompt is modified to instruct the assistant to verify the query always before returning the response to the user. This would make the assistant always call the custom function tool – executeQuery.

user_prompt = f"""
Your task is to write a MongoDB Query, specifically an aggregation pipeline
that would produce the expected output for the given input.

Important: You will always execute the query to verify that it produces 
the expected output.

Input data: {input_data} 
Expected output data: {output_data}
"""

Execute the custom function call

This is the final step in the implementation.

  • Firstly, we invoke the process_user_input function and get the latest run object that contains the assistant response.
  • Secondly, we implement a loop, so that we can invoke the assistant iteratively until it has produced an accurate query.
  • Finally within each iteration in the loop, we check whether the run status is ‘requires_action‘, to invoke the custom function (execute_query) by using the mongoDBQuery value returned by the assistant in the function arguments. If the execute_query function call yields a ‘success‘ response, we exit the loop, and return the corresponding query as the final output. Conversely, if the response of execute_query function is “failure“, we trigger the assistant once more by submitting the tool output for further iterations.

In this implementation, we are looping for 3 times but that could be adjusted based on our preference on how many times we want to retry and give the assistant to chance to arrive at an accurate solution.

Play around with this example by changing the return value to “failure” in the execute_query function. This should make the assistant to retry producing the query for this data and then verify it as well.

thread, run = process_user_input(user_prompt)

for i in range(3):
    run = get_completed_run(thread, run)

    if run:
        if run.status == "requires_action": 
            tool_call = run.required_action.submit_tool_outputs.tool_calls[0]
            function_name = tool_call.function.name
            arguments = json5.loads(tool_call.function.arguments)
            print(f"Function Name: {function_name}\nArguments: {arguments}")
            
            response = execute_query(arguments["mongoDBQuery"])
            if response == "success":
                print(f"Assistant Response - MongoDB Query: {arguments['mongoDBQuery']}")
                break
            else:
                run = openai_client.beta.threads.runs.submit_tool_outputs(
                    thread_id=thread.id,
                    run_id=run.id,
                    tool_outputs=[
                        {
                            "tool_call_id": tool_call.id,
                            "output": """The generated MongoDB Query 
                                         didn't produce the expected output.
                                         Please try again"""
                        }
                    ]
                )
        elif run.status == "completed":
            messages = openai_client.beta.threads.messages.list(thread.id)
            print(f"Assistant Response: {messages.data[0].content[0].text.value}")
            break
    else: 
        print("Error: Assistant timed out.")
else:
    print("Error: Assistant couldn't produce the query for the given input.")

Output

Function Name: executeQuery

Arguments: {'mongoDBQuery': '[\n  {\n    "$group": {\n      "_id": "$team",\n      "playerCount": {"$sum": 1}\n    }\n  },\n  {\n    "$project": {\n      "team": "$_id",\n      "playerCount": 1,\n      "_id": 0\n    }\n  }\n]', 'queryExplanation': "The query groups the documents by the 'team' field and calculates the count of players for each team. Then it projects the 'team' and 'playerCount' fields, and removes the '_id' field from the output."}

Assistant Response - MongoDB Query: [
  {
    "$group": {
      "_id": "$team",
      "playerCount": {"$sum": 1}
    }
  },
  {
    "$project": {
      "team": "$_id",
      "playerCount": 1,
      "_id": 0
    }
  }
]

Code

The complete example discussed above can be found in this Jupyter Notebook – MongoDB Query generator with Assistants API

Summary

In summary, we saw how to use Assistants API for generating MongoDB queries, use custom functions to ensure a reliable JSON format for output, and leverage custom function tools to execute custom logic and submit the result to create a agent like experience with the Assistants API.

Thanks for reading and happy coding!

Leave a Comment

Your email address will not be published. Required fields are marked *