Skip to main content

Integrating DynamoGuard

To integrate DynamoGuard with your application, a remote model must be created in the Dynamo AI platform. This can be done through the UI as shown below:

Managed Inference

After setting up a model, DynamoGuard can be integrated in your application using our managed inference solution. For this solution, instead of calling your base model, you will use the DynamoGuard chat endpoint in your application. Using the managed inference endpoint, DynamoGuard will apply all guardrails enabled for the specified model to the user input and response received from the model.

import requests
model_id = '<MODEL_ID>' # The specific model ID to test against
api_key = '<API_KEY>' # Dynamo AI API key

USER_REQUEST = "What should I invest in?" # User input

headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}

json_data = {
'messages': [
{
'role': 'user',
'content': USER_REQUEST,
},
],
}

response = requests.post(
f'https://api.dynamo.ai/v1/moderation/{model_id}/chat/session_id',
headers=headers,
json=json_data
)

Custom Integration

A custom integration with DynamoGuard using the analyze endpoint can be created as shown below. In this example, we use OpenAI’s GPT 3.5 model and demonstrate how to use the optional ragContext parameter.

import requests
from openai import OpenAI

policy_id = "<POLICY_ID>" # The specific policy ID to evaluate
api_key = "<API_KEY>" # Dynamo AI API key

USER_REQUEST = "What should I invest in?" # User input
RAG_CONTEXT = (
"There are five different asset classes available." # Retrieved context from RAG (Optional)
)

client = OpenAI() # Creating an OpenAI client

headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {api_key}",
}

data = {
"messages": [{"role": "user", "content": USER_REQUEST, "ragContext": RAG_CONTEXT}],
"textType": "MODEL_INPUT",
"policyIds": [policy_id],
"modelId": "MODEL_ID",
}

response = requests.post(
"https://api.dynamo.ai/v1/moderation/analyze/", headers=headers, json=data
)

# If the final action is not block, send a request to the model
if response.json()["finalAction" != "BLOCK"]:
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"{RAG_CONTEXT}\n{USER_REQUEST}"}],
)

MODEL_RESPONSE = completion.choices[0].message # Model response from GPT-3.5

# Running DynamoGuard on the model response
data = {
"messages": [
{"role": "user", "content": USER_REQUEST, "ragContext": RAG_CONTEXT},
{"role": "assistant", "content": MODEL_RESPONSE},
],
"textType": "MODEL_RESPONSE",
"policyIds": [policy_id],
}

response = requests.post(
"https://api.dynamo.ai/v1/moderation/analyze/", headers=headers, json=data
)

Streaming

The DynamoGuard analyze endpoint supports guardrailing streamed outputs from a LLM. Currently, this is done by repeatedly concatenating streamed chunks and analyzing the result. More specificaly, given chunk CiC_i, we analyze C1C2Ci1CiC_1C_2\cdots C_{i-1}C_i. We are actively developing techniques to decrease token usage.

There are a couple ways to use the streaming feature.

Web Socket

The following is an example of integrating OpenAI streaming. We use sessions to group all of the chunks under one monitoring log entry.

First, lets setup our environment:

DYNAMOAI_API_KEY = ""
VPC_API_URL = "ws://api.dynamo.ai"
POLICY_ID = ""
MODEL_ID = None # Optional

OPENAI_API_KEY = ""
OPENAI_MODEL = "gpt-4"

Here is an example function for initializing the websocket connection:

from websocket import create_connection, WebSocket
import json
import logging
from typing import Any

logger = logging.getLogger("stream_example")


def send_ws_msg(ws: WebSocket, event: str, data: dict):
msg = json.dumps({"event": event, "data": data})
logger.debug(msg)
ws.send(msg)


def recv_ws_msg(ws: WebSocket, target_event: str) -> Any:
while True:
response = json.loads(ws.recv())
logger.debug(response)
if response["event"] == target_event:
return response["data"]
elif response["event"] == "error":
raise ValueError(response)


def create_ws_connection() -> WebSocket:
ws = create_connection(f"{VPC_API_URL}/v1/moderation/stream/analyze")
logger.debug("Authorizing")
send_ws_msg(ws, "auth", {"token": DYNAMOAI_API_KEY})
recv_ws_msg(ws, "client-info")
logger.debug("Authorized")
return ws

Here are some relevant example functions for starting, using, and ending a stream session. Enable debug logging to see the websocket messages.

def start_session(ws: WebSocket, input_prompt: str):
send_ws_msg(
ws,
"start",
{
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": [POLICY_ID],
"modelId": MODEL_ID,
},
)

recv_ws_msg(ws, "session_start")


def analyze(ws: WebSocket, output_chunk: str) -> str:
send_ws_msg(ws, "analyze", {"text": output_chunk})
data = recv_ws_msg(ws, "analyze_result")
return data["finalAction"]


def end_session(ws: WebSocket):
send_ws_msg(ws, "end", {})
recv_ws_msg(ws, "session_end")

Start session is used to begin the streaming session. This tells DynamoGuard the preceding messages, the policies to apply, and the (optional) model to associate the log with. The data object is the same as the analyze REST endpoint. End session is used to end the session. Analyze is used to send each streamed chunk to DynamoGuard. The response from analyze is the same as the REST endpoint. Internally, we wait until we have enough tokens to analyze.

Now we can use OpenAI streaming

from typing import Iterator
from openai import OpenAI

client = OpenAI(api_key=OPENAI_API_KEY)


def guarded_chat(ws: WebSocket, prompt: str) -> Iterator[str]:
start_session(ws, prompt)
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0,
stream=True,
)
for chunk in response:
choice = chunk.choices[0]
message = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
action = analyze(ws, message)
if action == "BLOCK":
yield f"{message}[BLOCKED]"
break
yield message
end_session(ws)


def chat(prompt: str):
ws = create_ws_connection()
for chunk in guarded_chat(ws, prompt):
print(chunk, end="")

The chat function is the main entrypoint to this example.

GRPC (Coming Soon)