Integrations
Azure Application Insights
Traceloop supports sending traces to Azure Application Insights via standard OpenTelemetry integrations.
Review how to setup OpenTelemetry with Python in Azure Application Insights.
- Provision an Application Insights instance in the Azure portal.
- Get your Connection String from the instance - details here.
- Install required packages
pip install azure-monitor-opentelemetry-exporter traceloop-sdk openai
- Example implementation
import os
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task, agent, tool
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# Configure the tracer provider to export traces to Azure Application Insights.
# Get your complete connection string from the Azure Portal or CLI.
exporter = AzureMonitorTraceExporter(connection_string="INSERT_CONNECTION_STRING_HERE")
# Pass your exporter to Traceloop
Traceloop.init(app_name="your_app_name", exporter=exporter)
@task(name="joke_creation")
def create_joke():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
)
return completion.choices[0].message.content
@task(name="signature_generation")
def generate_signature(joke: str):
completion = client.completions.create(model="davinci-002",
prompt="add a signature to the joke:\n\n" + joke)
return completion.choices[0].text
@agent(name="joke_translation")
def translate_joke_to_pirate(joke: str):
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"Translate the below joke to pirate-like english:\n\n{joke}"}],
)
history_jokes_tool()
return completion.choices[0].message.content
@tool(name="history_jokes")
def history_jokes_tool():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"get some history jokes"}],
)
return completion.choices[0].message.content
@workflow(name="pirate_joke_generator")
def joke_workflow():
eng_joke = create_joke()
pirate_joke = translate_joke_to_pirate(eng_joke)
signature = generate_signature(pirate_joke)
print(pirate_joke + "\n\n" + signature)
if __name__ == "__main__":
joke_workflow()