This is a continuation to the previous article on generating MongoDB Queries using OpenAI Chat Completion API. In this article, we will look at how to construct MongoDB Queries using OpenAI Assistants API given an input and an expected output.
In this article, we will cover the following:
- Create an assistant using Assistants API and use it to generate MongoDB queries.
- Use custom function tool to format the output in JSON format.
- Use custom function tool to verify that the generated query is accurate. We will mock up the verification step to keep the example simple.
Please refer to the Code section at the end of the article for the link to the repo containing the Jupyter Notebook used in this article.
Create a MongoDB assistant
Let’s start with creating an assistant using the Assistants API and use it to generate MongoDB queries for a given input data and expected output.
Data
Below is the input data and the expected output data that we will use for testing throughout this example.
input_data = """
[
{ "name": "Sachin", "team": "India" },
{ "name": "Sourav", "team": "India" },
{ "name": "Lara", "team": "West Indies" }
]
"""
output_data = """
[
{ "team": "India", "playerCount": 2 },
{ "team": "West Indies", "playerCount": 1 }
]
"""
Basic Setup
Install and import the packages and then instantiate the OpenAI client. The OpenAI API key is stored in an .env file and it is loaded as an environment variable.
!pip install openai
!pip install python-dotenv
from openai import OpenAI
import os
from dotenv import load_dotenv
import time
import json5
import json
_ = load_dotenv()
openai_client = OpenAI()
GPT3_MODEL = "gpt-3.5-turbo-1106"
GPT4_MODEL = "gpt-4-1106-preview"
Prompts
The assistant_instructions defines what the assistant is all about and it’s characteristics.
The user_prompt is used for creating user messages by passing in the input and expected output data from the user and then make it part of a thread.
A thread in Assistants API reflects a single user session comprising of the messages between the user and the AI assistant.
assistant_instructions = """You are a MongoDB expert
with great expertise in writing MongoDB queries for any given data
to produce an expected output."""
user_prompt = f"""Your task is to write a MongoDB Query,
specifically an aggregation pipeline that would produce
the expected output for the given input.
You will always return a JSON response with the following fields.
```
mongoDBQuery: The MongoDB aggregation pipeline to produce
the expected output for a given input. This field corresponds
to just the list of stages in the aggregation pipeline
and shouldn't contain the "db.collection.aggregate" prefix.
queryExplanation: A detailed explanation for the query
that was returned.
```
Input data: {input_data}
Expected output data: {output_data}
"""
Create an assistant
Let’s create an assistant named “MongoDB assistant” by passing in the assistant_instructions defined above. Choose to pass either GPT 3.5 or GPT 4 model.
We will also store the assistant’s ID for future use in the example, allowing us to update it with custom tools
assistant = openai_client.beta.assistants.create(
name="MongoDB assistant",
instructions=assistant_instructions,
model=GPT3_MODEL
)
MONGO_DB_ASSISTANT_ID = assistant.id
Create a thread and add messages
Now, we will create a thread to start a user conversation and add a single message to the thread using the user_prompt defined above.
thread = openai_client.beta.threads.create()
message = openai_client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_prompt
)
Invoke the assistant
In the Assistants API, we need to create a run object to invoke or execute the assistant by passing in the thread and the assistant id.
run = openai_client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
This is an asynchronous invocation and the run object will go through a sequence of statuses, starting with “queued,” to “in-progress” and finally to “completed” status. We will need to poll at regular intervals to identify when the status of the run object turns to “completed”. Once the run is in “completed” status, we can retrieve the messages from the thread to get the assistant’s response.
In the below code, we poll for a total of 2 minutes for every 2 seconds. In each iteration, we retrieve the latest run object and check if the status is “completed”. If it is completed, we retrieve the data from the first message which would be the response from the assistant.
for i in range(60):
try:
updated_run = openai_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
# Check if the status indicates completion
if updated_run.status == "completed":
messages = openai_client.beta.threads.messages.list(
thread_id=thread.id
)
print(f"Assistants Response: {messages.data[0].content[0].text.value}")
break
except Exception as e:
print(f"Error: {str(e)}. Trying again...")
finally:
time.sleep(2)
else:
# If max_attempts reached without completion, then assistant call timed out
print("Timeout: Assistant didn't respond in time. Please try again.")
Output
Below is a sample output from the assistant providing us with the MongoDB query for the given input and expected output data along with the explanation for the query.
Assistants Response: Here's the MongoDB aggregation query for the given input:
```json
[
{
$group: {
_id: "$team",
playerCount: { $sum: 1 }
}
},
{
$project: {
team: "$_id",
playerCount: 1,
_id: 0
}
}
]
```
Explanation:
1. `$group` stage: Groups the input documents by the "team" field and calculates the player count for each group using the `$sum` accumulator.
2. `$project` stage: Reshapes the output to rename the `_id` field as "team" and includes the "playerCount" field while excluding the default `_id` field.
Format the assistant response using function calling
If we were to use the output from the assistant in downstream systems or use it to display in the UI or make it part of an API response, we would need the response in a reliable format, typically adhering to a JSON schema.
Since Assistants API does not support JSON response format like the Chat Completion API, we will use a custom function tool and use the function arguments generated by the assistant to retrieve the response from the assistant in an expected format.
Update the assistant using custom function tool
Define a custom function and pass it as a tool to the assistant. This function will have two parameters – mongoDBQuery and queryExplanation – the ones that we need from the assistant in the response.
assistant = openai_client.beta.assistants.update(
MONGO_DB_ASSISTANT_ID,
tools=[
{
"type": "function",
"function": {
"name": "formatResponse",
"description": """Format the assistant's response
before responding to user""",
"parameters": {
"type": "object",
"properties": {
"mongoDBQuery": {
"type": "string",
"description": """The MongoDB aggregation pipeline
to produce the expected output for a given input.
This field corresponds to just the list of stages
in the aggregation pipeline and shouldn't contain
the 'db.collection.aggregate' prefix."""
},
"queryExplanation": {
"type": "string",
"description": "A detailed explanation for \
the query that was returned."
}
},
"required": ["mongoDBQuery", "queryExplanation"]
}
}
}
]
)
User Prompt
Now that we have created the assistant with the custom function tool, we need to instruct it to invoke the function always, so that we can get a formatted response. So, we will modify our previous user prompt as follows.
We will mark the instruction “Important”, so that the assistant honors our request and always decides to invoke the function.
user_prompt = f"""Your task is to write a MongoDB Query,
specifically an aggregation pipeline that would produce
the expected output for the given input.
Important: You will always format the response using the
formatResponse tool before responding to user.
Input data: {input_data}
Expected output data: {output_data}
"""
Create a thread, add the message and run the assistant
Let’s create a thread, add the message and run the assistant just like we did last time.
thread = openai_client.beta.threads.create()
message = openai_client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_prompt
)
run = openai_client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
Retrieve the assistant response
This time since we have asked the assistant to call the function always, we will look for “requires_action” status on the run object as that is the status the assistant sets when it decides to use one of its tools, in this case a custom function.
We will retrieve the function arguments from the tool_calls array. Since we have just one tool here, we can safely get it from the starting index in the array.
Note: The reason to use JSON5 library to parse is that sometimes in the assistant response, for the aggregation pipeline stages, the quotes are missing around the stage names, thus making it difficult to parse it to a Python dict. JSON5 is helpful in resolving that issue and properly parsing it even when the quotes are missing.
for i in range(60):
try:
updated_run = openai_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
# Check if the status indicates completion
if updated_run.status == "requires_action":
assitant_response = json5.loads(updated_run.required_action\
.submit_tool_outputs\
.tool_calls[0].function.arguments)
print(f"Assistant Response:\n{assitant_response}")
break
except Exception as e:
print(f"Error: {str(e)}. Trying again...")
finally:
time.sleep(2)
else:
# If max_attempts reached without completion, then assistant call timed out
print("Timeout: Assistant didn't respond in time. Please try again.")
Output
Below is the formatted output from the assistant’s response which is in an expected variable and a standard JSON schema containing two fields – mongoDBQuery and queryExplanation.
Assistant Response:
{'mongoDBQuery': '[{"$group":{"_id":"$team","playerCount":{"$sum":1}}},
{"$project":{"team":"$_id","playerCount":1,"_id":0}}]',
'queryExplanation': "This query uses the $group stage to group the
documents by the 'team' field, and then uses the $project stage to
reshape the output to include the 'team' and 'playerCount' fields
while excluding the _id field."}
Verify that the generated query is accurate
Now that we have the output in a dependable format, we can take this example further and instruct the assistant to verify the generated query by executing it on a MongoDB instance. This would provide a way for the assistant to check whether the resulting output matches with the user-provided expected output. In cases where the output doesn’t match with the expected output, the assistant can iteratively refine the query and verify it until an accurate match is achieved.
To keep the example simple, we will mock this query verification and return a hardcoded response. If you would like to see an actual implementation of the verification process against a MongoDB instance, please check out the previous blog post for details.
Create a new custom function
We will create a new custom function and add it as a tool to the assistant for this query verification step. This function interface is same as the one that we used for formatting the response but with a different name and description.
execute_query_function = {
"name": "executeQuery",
"description": """Execute the MongoDB Query on the given input data
to verify the output""",
"parameters": {
"type": "object",
"properties": {
"mongoDBQuery": {
"type": "string",
"description": """The MongoDB aggregation pipeline to
produce the expected output
for a given input. This field
corresponds to just the list of
stages in the aggregation pipeline
and shouldn't contain the
'db.collection.aggregate' prefix."""
},
"queryExplanation": {
"type": "string",
"description": """A detailed explanation for the query
that was returned."""
}
},
"required": ["mongoDBQuery", "queryExplanation"]
}
}
Update the assistant with the new custom function
Let’s update the assistant to add this new custom function as a tool in the tools array.
assistant = openai_client.beta.assistants.update(
MONGO_DB_ASSISTANT_ID,
tools = [
{
"type": "function",
"function": execute_query_function
}
]
)
Utility functions
There are a couple of utility functions defined which will be used to manage the back and forth conversation with the assistant.
- The process_user_input function will be used to manage the user conversation with the assistant by creating a thread, adding messages to it and invoking the assistant for that thread.
- The get_completed_run function operates akin to an asynchronous wait function. It awaits and returns the run object when its status changes to ‘completed‘ or ‘requires_action.’ This is achieved through periodic polling, fetching the latest run object, and checking its status.
- The execute_query function is the actual function that will be invoked when the assistant decides to use the custom function tool – executeQuery. In this implementation we are returning just a mock response without actually executing the query.
def process_user_input(user_input):
#Create a new thread
thread = openai_client.beta.threads.create()
#Add a message with the user query to the thread
message = openai_client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=user_prompt
)
#Create a run to invoke the assistant
run = openai_client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id=assistant.id
)
return thread, run
def get_completed_run(thread, run, max_attempts=60, sleep_interval=2):
for i in range(max_attempts):
try:
run = openai_client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=run.id
)
# Check if the status indicates completion
if run.status == "completed" or run.status == "requires_action":
return run
except Exception as e:
print(f"Error: {str(e)}. Trying again...")
finally:
time.sleep(sleep_interval)
else:
# If max_attempts reached without completion, then assistant timed out
return None
# Custom function tool
def execute_query(mongoDBQuery):
return "success"
User Prompt
The user prompt is modified to instruct the assistant to verify the query always before returning the response to the user. This would make the assistant always call the custom function tool – executeQuery.
user_prompt = f"""
Your task is to write a MongoDB Query, specifically an aggregation pipeline
that would produce the expected output for the given input.
Important: You will always execute the query to verify that it produces
the expected output.
Input data: {input_data}
Expected output data: {output_data}
"""
Execute the custom function call
This is the final step in the implementation.
- Firstly, we invoke the process_user_input function and get the latest run object that contains the assistant response.
- Secondly, we implement a loop, so that we can invoke the assistant iteratively until it has produced an accurate query.
- Finally within each iteration in the loop, we check whether the run status is ‘requires_action‘, to invoke the custom function (execute_query) by using the mongoDBQuery value returned by the assistant in the function arguments. If the execute_query function call yields a ‘success‘ response, we exit the loop, and return the corresponding query as the final output. Conversely, if the response of execute_query function is “failure“, we trigger the assistant once more by submitting the tool output for further iterations.
In this implementation, we are looping for 3 times but that could be adjusted based on our preference on how many times we want to retry and give the assistant to chance to arrive at an accurate solution.
Play around with this example by changing the return value to “failure” in the execute_query function. This should make the assistant to retry producing the query for this data and then verify it as well.
thread, run = process_user_input(user_prompt)
for i in range(3):
run = get_completed_run(thread, run)
if run:
if run.status == "requires_action":
tool_call = run.required_action.submit_tool_outputs.tool_calls[0]
function_name = tool_call.function.name
arguments = json5.loads(tool_call.function.arguments)
print(f"Function Name: {function_name}\nArguments: {arguments}")
response = execute_query(arguments["mongoDBQuery"])
if response == "success":
print(f"Assistant Response - MongoDB Query: {arguments['mongoDBQuery']}")
break
else:
run = openai_client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,
run_id=run.id,
tool_outputs=[
{
"tool_call_id": tool_call.id,
"output": """The generated MongoDB Query
didn't produce the expected output.
Please try again"""
}
]
)
elif run.status == "completed":
messages = openai_client.beta.threads.messages.list(thread.id)
print(f"Assistant Response: {messages.data[0].content[0].text.value}")
break
else:
print("Error: Assistant timed out.")
else:
print("Error: Assistant couldn't produce the query for the given input.")
Output
Function Name: executeQuery
Arguments: {'mongoDBQuery': '[\n {\n "$group": {\n "_id": "$team",\n "playerCount": {"$sum": 1}\n }\n },\n {\n "$project": {\n "team": "$_id",\n "playerCount": 1,\n "_id": 0\n }\n }\n]', 'queryExplanation': "The query groups the documents by the 'team' field and calculates the count of players for each team. Then it projects the 'team' and 'playerCount' fields, and removes the '_id' field from the output."}
Assistant Response - MongoDB Query: [
{
"$group": {
"_id": "$team",
"playerCount": {"$sum": 1}
}
},
{
"$project": {
"team": "$_id",
"playerCount": 1,
"_id": 0
}
}
]
Code
The complete example discussed above can be found in this Jupyter Notebook – MongoDB Query generator with Assistants API
Summary
In summary, we saw how to use Assistants API for generating MongoDB queries, use custom functions to ensure a reliable JSON format for output, and leverage custom function tools to execute custom logic and submit the result to create a agent like experience with the Assistants API.
Thanks for reading and happy coding!