Integrating DynamoGuard
To integrate DynamoGuard with your application, a remote model must be created in the Dynamo AI platform. This can be done through the UI as shown below:
Managed Inference
After setting up a model, DynamoGuard can be integrated in your application using our managed inference solution. For this solution, instead of calling your base model, you will use the DynamoGuard chat
endpoint in your application. Using the managed inference endpoint, DynamoGuard will apply all guardrails enabled for the specified model to the user input and response received from the model.
import requests
model_id = '<MODEL_ID>' # The specific model ID to test against
api_key = '<API_KEY>' # Dynamo AI API key
USER_REQUEST = "What should I invest in?" # User input
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
json_data = {
'messages': [
{
'role': 'user',
'content': USER_REQUEST,
},
],
}
response = requests.post(
f'https://api.dynamo.ai/v1/moderation/{model_id}/chat/session_id',
headers=headers,
json=json_data
)
Custom Integration
A custom integration with DynamoGuard using the analyze endpoint can be created as shown below. In this example, we use OpenAI’s GPT 3.5 model and demonstrate how to use the optional ragContext
parameter.
import requests
from openai import OpenAI
policy_id = "<POLICY_ID>" # The specific policy ID to evaluate
api_key = "<API_KEY>" # Dynamo AI API key
USER_REQUEST = "What should I invest in?" # User input
RAG_CONTEXT = (
"There are five different asset classes available." # Retrieved context from RAG (Optional)
)
client = OpenAI() # Creating an OpenAI client
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {api_key}",
}
data = {
"messages": [{"role": "user", "content": USER_REQUEST, "ragContext": RAG_CONTEXT}],
"textType": "MODEL_INPUT",
"policyIds": [policy_id],
"modelId": "MODEL_ID",
}
response = requests.post(
"https://api.dynamo.ai/v1/moderation/analyze/", headers=headers, json=data
)
# If the final action is not block, send a request to the model
if response.json()["finalAction" != "BLOCK"]:
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"{RAG_CONTEXT}\n{USER_REQUEST}"}],
)
MODEL_RESPONSE = completion.choices[0].message # Model response from GPT-3.5
# Running DynamoGuard on the model response
data = {
"messages": [
{"role": "user", "content": USER_REQUEST, "ragContext": RAG_CONTEXT},
{"role": "assistant", "content": MODEL_RESPONSE},
],
"textType": "MODEL_RESPONSE",
"policyIds": [policy_id],
}
response = requests.post(
"https://api.dynamo.ai/v1/moderation/analyze/", headers=headers, json=data
)
Streaming
The DynamoGuard analyze endpoint supports guardrailing streamed outputs from a LLM. Currently, this is done by repeatedly concatenating streamed chunks and analyzing the result. More specificaly, given chunk , we analyze . We are actively developing techniques to decrease token usage.
There are a couple ways to use the streaming feature.
Web Socket
The following is an example of integrating OpenAI streaming. We use sessions to group all of the chunks under one monitoring log entry.
First, lets setup our environment:
DYNAMOAI_API_KEY = ""
VPC_API_URL = "ws://api.dynamo.ai"
POLICY_ID = ""
MODEL_ID = None # Optional
OPENAI_API_KEY = ""
OPENAI_MODEL = "gpt-4"
Here is an example function for initializing the websocket connection:
from websocket import create_connection, WebSocket
import json
import logging
from typing import Any
logger = logging.getLogger("stream_example")
def send_ws_msg(ws: WebSocket, event: str, data: dict):
msg = json.dumps({"event": event, "data": data})
logger.debug(msg)
ws.send(msg)
def recv_ws_msg(ws: WebSocket, target_event: str) -> Any:
while True:
response = json.loads(ws.recv())
logger.debug(response)
if response["event"] == target_event:
return response["data"]
elif response["event"] == "error":
raise ValueError(response)
def create_ws_connection() -> WebSocket:
ws = create_connection(f"{VPC_API_URL}/v1/moderation/stream/analyze")
logger.debug("Authorizing")
send_ws_msg(ws, "auth", {"token": DYNAMOAI_API_KEY})
recv_ws_msg(ws, "client-info")
logger.debug("Authorized")
return ws
Here are some relevant example functions for starting, using, and ending a stream session. Enable debug logging to see the websocket messages.
def start_session(ws: WebSocket, input_prompt: str):
send_ws_msg(
ws,
"start",
{
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": [POLICY_ID],
"modelId": MODEL_ID,
},
)
recv_ws_msg(ws, "session_start")
def analyze(ws: WebSocket, output_chunk: str) -> str:
send_ws_msg(ws, "analyze", {"text": output_chunk})
data = recv_ws_msg(ws, "analyze_result")
return data["finalAction"]
def end_session(ws: WebSocket):
send_ws_msg(ws, "end", {})
recv_ws_msg(ws, "session_end")
Start session is used to begin the streaming session. This tells DynamoGuard the preceding messages, the policies to apply, and the (optional) model to associate the log with. The data object is the same as the analyze REST endpoint. End session is used to end the session. Analyze is used to send each streamed chunk to DynamoGuard. The response from analyze is the same as the REST endpoint. Internally, we wait until we have enough tokens to analyze.
Now we can use OpenAI streaming
from typing import Iterator
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
def guarded_chat(ws: WebSocket, prompt: str) -> Iterator[str]:
start_session(ws, prompt)
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0,
stream=True,
)
for chunk in response:
choice = chunk.choices[0]
message = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
action = analyze(ws, message)
if action == "BLOCK":
yield f"{message}[BLOCKED]"
break
yield message
end_session(ws)
def chat(prompt: str):
ws = create_ws_connection()
for chunk in guarded_chat(ws, prompt):
print(chunk, end="")
The chat
function is the main entrypoint to this example.