feb 12, 2025
mhar andal
Transports are base implementations for handling audio/video I/O streams.
1import asyncio
2
3from pipecat.frames.frames import TextFrame
4from pipecat.pipeline.pipeline import Pipeline
5from pipecat.pipeline.task import PipelineTask
6from pipecat.pipeline.runner import PipelineRunner
7from pipecat.services.cartesia import CartesiaTTSService
8from pipecat.transports.services.daily import DailyParams, DailyTransport
9
10async def main():
11 # Use Daily as a real-time media transport (WebRTC)
12 transport = DailyTransport(
13 room_url=...,
14 token="", # leave empty. Note: token is _not_ your api key
15 bot_name="Bot Name",
16 params=DailyParams(audio_out_enabled=True))
17
18 # Use Cartesia for Text-to-Speech
19 tts = CartesiaTTSService(
20 api_key=...,
21 voice_id=...
22 )
23
24 # Simple pipeline that will process text to speech and output the result
25 pipeline = Pipeline([tts, transport.output()])
26
27 # Create Pipecat processor that can run one or more pipelines tasks
28 runner = PipelineRunner()
29
30 # Assign the task callable to run the pipeline
31 task = PipelineTask(pipeline)
32
33 # Register an event handler to play audio when a
34 # participant joins the transport WebRTC session
35 @transport.event_handler("on_first_participant_joined")
36 async def on_first_participant_joined(transport, participant):
37 participant_name = participant.get("info", {}).get("userName", "")
38 # Queue a TextFrame that will get spoken by the TTS service (Cartesia)
39 await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
40
41 # Register an event handler to exit the application when the user leaves.
42 @transport.event_handler("on_participant_left")
43 async def on_participant_left(transport, participant, reason):
44 await task.cancel()
45
46 # Run the pipeline task
47 await runner.run(task)
48
49if __name__ == "__main__":
50 asyncio.run(main())
51
This is how we handle events from the transport. We can use this to handle events like when a participant joins the call or leaves.
In the example above, we'll generate some text to speech when a participant joins the call.
This is a great starting point, but we need to update our pipeline to add more functionality.
1
2from pipecat.frames.frames import OutputAudioRawFrame, Frame, TextFrame, UserImageRequestFrame
3from pipecat.pipeline.pipeline import Pipeline
4from pipecat.pipeline.runner import PipelineRunner
5from pipecat.processors.aggregators.user_response import UserResponseAggregator
6from pipecat.pipeline.task import PipelineParams, PipelineTask
7from pipecat.processors.logger import FrameLogger
8from pipecat.processors.frame_processor import FrameDirection
9from pipecat.services.cartesia import CartesiaTTSService
10from pipecat.services.openai import (
11 OpenAILLMContext,
12 OpenAILLMContextFrame,
13 OpenAILLMService,
14)
15from pipecat.services.elevenlabs import ElevenLabsTTSService
16from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
17from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
18from pipecat.transports.services.daily import DailyParams, DailyTransport
19from pipecat.vad.silero import SileroVADAnalyzer
20
21class IntakeProcessor:
22 def __init__(self, context: OpenAILLMContext):
23 print(f"Initializing context from IntakeProcessor")
24
25 # Add relevant business context to the LLM
26 context.add_message(
27 {
28 "role": "system",
29 "content": f"""
30 You are Alex, an agent for a company called Ender. Your job is to active as a companion of a virtual gaming club.
31
32 The user is in central time.
33
34 You should address the user by their first name and be polite as if you were talking to a child.
35 Add punctuation where appropriate and at the end of each transcript whenever possible.
36 To insert pauses, insert “-” where you need the pause.
37
38 Your job is to help the child with any questions they have.
39
40 Talk in simple sentences for a child to understand.
41
42 The gaming club is from 4-6 PM in all timezones.
43
44 This user in particular is neurodivergent, so be very kind and patient with them.
45
46 You must follow these guidelines:
47
48 Empathy and Support: You should recognize when a child may be frustrated or anxious and respond with encouragement and reassurance.
49
50 Example Behavior: If a child expresses frustration (“This level is too hard!”), you could respond with:
51 "It sounds like you're feeling a bit stuck, and that's okay! How about we try breaking it down into smaller steps? What’s one thing we can try together?"
52
53 Encouraging Social Interaction: You should promote teamwork and connection by encouraging kids to interact with their peers positively.
54
55 Example Behavior: When the child completes a task, you might say:
56 "Great job! Would you like to share what you did with your teammates? They might find it really helpful!"
57
58 Autonomy and Choice: You should offer choices that allow the child to feel in control of their gaming experience.
59
60 Example Behavior: When a child completes a challenge, the chatbot can offer multiple options:
61 "Congrats on finishing! Would you like to explore a new area, customize your character, or take a quick break"
62
63 Clear Instructions and Guidance: The chatbot should provide clear, step-by-step instructions to help kids who may struggle with more complex gameplay or social situations.
64
65 Example Behavior: If a child seems confused about the next step, the bot can explain simply:
66 "No worries if you're unsure! Here's what you can do next: First, open the map, then choose the challenge you want to try. If you need more help, just ask!"
67
68 Encouraging Breaks and Self-Care: The chatbot should encourage kids to take breaks when needed, promoting mindfulness and well-being.
69
70 Example Behavior: After noticing the child has been playing for a long time, the chatbot could say:
71 "You’ve been playing for a while now—how about a quick stretch or water break? You can jump back into the fun in just a few minutes!"
72
73 Your job is to help the child with any questions they have.
74 Ask for clarification if a user response is ambiguous.
75
76 If they ask about how to join the minecraft server, ask them what device do they want to join on? There should be 4 options: Computer, phone, tablet, or console(switch, xbox).
77
78 If they say xbox, you must tell them these steps
79
80 1. Visit https://account.xbox.com/settings.
81 2. Under "Privacy" in the table header, locate the setting labeled "You can communicate outside of Xbox with voice & text" Choose Allow.
82 3. Under "Xbox and Windows 10 devices Online Safety" in the table header, select Allow for "You can create and join clubs", "You can join multiplayer games", and "You can add friends"
83
84 If they say computer, phone, or tablet, you must tell them these steps
85
86 1. Open Minecraft
87 2. Click multiplayer then add server button
88 3. Type "minecraft.joinender.com" in address
89 4. Click Minecraft Club in your server list
90 5. Enter the code you see in Minecraft
91
92 If the user ask for a host or moderator, tell them that you will send a request for a host or moderator to join the room.
93
94 If the user wants to report an incident, ask for details then tell them that you will send a report to the host or moderator.
95
96 Start by introducing yourself and say you are a companion for the Ender Minecraft Club.
97
98 """,
99 }
100 )
101 context.set_tools(
102 [
103 {
104 "type": "function",
105 "function": {
106 "name": "report_incident",
107 "description": "Use this function to an report_incident.",
108 "parameters": {
109 "type": "object",
110 "properties": {
111 "incident": {
112 "type": "string",
113 "description": "Capture the incident details.",
114 }
115 },
116 },
117 },
118 }
119 ]
120 )
121
122 async def report_incident(
123 self, function_name, tool_call_id, args, llm, context, result_callback
124 ):
125 context.set_tools(
126 [
127 {
128 "type": "function",
129 "function": {
130 "name": "list_prescriptions",
131 "description": "Once the user has provided a list of their prescription medications, call this function.",
132 "parameters": {
133 "type": "object",
134 "properties": {
135 "prescriptions": {
136 "type": "array",
137 "items": {
138 "type": "object",
139 "properties": {
140 "medication": {
141 "type": "string",
142 "description": "The medication's name",
143 },
144 "dosage": {
145 "type": "string",
146 "description": "The prescription's dosage",
147 },
148 },
149 },
150 }
151 },
152 },
153 },
154 }
155 ]
156 )
157
158 await result_callback(
159 [
160 {
161 "role": "system",
162 "content": "Next, thank the user for reporting the incident, and let them know you'll take care of it.",
163 }
164 ]
165 )
166 else:
167 # The user provided an incorrect birthday; ask them to try again
168 await result_callback(
169 [
170 {
171 "role": "system",
172 "content": "The user provided an incorrect birthday. Ask them for their birthday again. When they answer, call the verify_birthday function.",
173 }
174 ]
175 )
176
177
178llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
179messages = []
180context = OpenAILLMContext(messages=messages)
181context_aggregator = llm.create_context_aggregator(context)
182user_response = UserResponseAggregator()
183image_requester = UserImageRequester()
184vision_aggregator = VisionImageFrameAggregator()
185
186pipeline = Pipeline(
187 [
188 transport.input(), # Transport input
189 context_aggregator.user(), # User responses
190 user_response,
191 image_requester,
192 vision_aggregator,
193 llm, # LLM
194 fl, # Frame logger
195 tts, # TTS
196 transport.output(), # Transport output
197 context_aggregator.assistant(), # Assistant responses
198 ]
199)
200
By adding a vision aggregator to the pipeline, we can send contextual vision data to our LLM.
This will allow the voice agent to see the user's screen and provide a contextual response.
We now just need to update our transport to support capturing the participants screen share.
1@transport.event_handler("on_first_participant_joined")
2async def on_first_participant_joined(transport, participant):
3 await transport.update_subscriptions(
4 participant_settings={participant["id"]: {"media": {"screenVideo": "subscribed"}}}
5 )
6 await transport.capture_participant_video(
7 participant["id"], framerate=0, video_source="screenVideo"
8 )
9
and viola...! We've successfully built a vision aggregator that can capture the user's screen share and provide it to our LLM.
bonus: With the IntakeProcessor, we can set functions to call from the LLM to automate tasks, such as reporting an incident. We can also add context to the LLM to help it make better decisions or personalize the responses for the user.