LLM Tool Call Streaming with LiteLLM
Tool call streaming with LiteLLM is fairly easy to implement once you understand how tool call streaming actually works. This post discusses how to implement tool call streaming with LiteLLM.
As I mentioned previously, I'm building an LLM chat app without using NextJS + ai-sdk from Vercel. I'm trying to do it using vanilla HTML, JS, and CSS along with a FastAPI backend using LiteLLM to help interact with LLMs.
tl;dr for how tool call streaming works (and other related things)
- Tool calls are streamed one chunk at a time until a full function call has been formed. If the LLM can run tools "in parallel", it will fully stream the output for one tool call at a time.
- Once the LLM is done with its streamed output for tool calling, you'll see a finish_reasonattribute. NOW is when you can go ahead and call those tools/functions
- The first chunk of any tool call will have call_id(which you use later to associate function output with that tool call) andnamewhich is the name of the function.
- Each subsequent chunk will contain a piece of the "arguments" that it wants to pass to the function, until it reaches the end of its streamed output; at which point it will output an object with a finish_reasonattribute.
- The streamed output representing one or more tool calls needs to be turned into a "message" that gets added to the message history of the chat. It looks something like this:
tc_message = {
    "role": "assistant",
    "content": None,
    "tool_calls": [
      {
        "id": "call_XYZ",
        "type": "function",
        "function": {
          "name": "my_function_name",
          "arguments": "{\"param1\": 123, \"param2\": \"some value\"}"
        }
      }
    ]
}- After calling each of the functions that the LLM wanted, you take the output from each tool call and append each one as a message to the message history; each one should look like something like this:
tc_message = {
    "role": "tool",
    "content": fn_output,
    "tool_call_id": "call_XYZ", # This is where the call_id comes back into play
    "name": "my_function_name" # Not sure if this is absolutely necessary, but it worked for me and I think they included it in the LiteLLM docs
}- So to actually stream the function/tool call, you can stream the name of the function back first, then subsequent chunks of the arguments as they're pieced together. As for the outputs, you can choose how you want to send that back.
- I'll provide the (messy) code below
More detailed breakdown of tool calls and tool call streaming
One of the things ai-sdk did by default is stream tool calls which was a neat feature, and my app needs to use tools and also already does streaming. So I figured I'd try to get tool streaming working. The problem is that LiteLLM's documentation doesn't really show what it looks like or how it works.
The way normal streaming works with LiteLLM is you enable streaming with stream=True, then run in a loop: for chunk in response: <send chunk to client>. Pretty simple.
But tool calls are special. The first "chunk" from a tool call message from an LLM will include call_id and name. call_id is a unique ID that will later be used to associate the response from the function with that tool call. This is useful if the LLM decides to call multiple tools in parallel. name is the name of the function to call. So it might look something like this (I'm doing this from memory / using OpenAI's sparse docs about this topic to help me remember and show examples):
{
  // other attributes...
  
  "choices": [
    "delta": {
      "tool_calls": [
        {
          "id": "call_DdmO9pD3xa9XTPNJ32zg2hcA",
          "type": "function",
          "function": {
            "name": "my_function_name"
          }
        }
      ]
    }
  ]
  // other attributes...
}That's the first chunk. Now each subsequent chunk will contain a token chunk of the args to pass along to the function. This is from OpenAI's documentation showing the sequence of streamed tool call parts inside of delta.tool_calls:
[{"index": 0, "id": "call_DdmO9pD3xa9XTPNJ32zg2hcA", "function": {"arguments": "", "name": "get_weather"}, "type": "function"}]
[{"index": 0, "id": null, "function": {"arguments": "{\"", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": "location", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": "\":\"", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": "Paris", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": ",", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": " France", "name": null}, "type": null}]
[{"index": 0, "id": null, "function": {"arguments": "\"}", "name": null}, "type": null}]As far as I can tell, if an LLM is planning to call multiple tools "in parallel", they're still streamed out one at a time until they're complete. There's no concern over the fact that it might stream out arguments for function call 2 in the middle of streaming out arguments for function call 1. So if an LLM plans to call (for example) 2 tools, it will output a series of chunks as shown above (first chunk has function name and call ID, with subsequent chunks have the arguments streamed out) for the first tool, then move on to the next one.
Once you receive a chunk from the LLM with the attribute finish_reason, now you can safely assume this is the last chunk of the LLM's output. Now the next step is really important - we need to add the LLM's tool call to the message history. Here's generally what the message looks like:
tc_message = {
    "role": "assistant",
    "content": None,
    "tool_calls": [
      {
        "id": "call_XYZ",
        "type": "function",
        "function": {
          "name": "my_function_name",
          "arguments": "{\"param1\": 123, \"param2\": \"some value\"}"
        }
      }
    ]
}I initially thought I needed to keep track of and build up/append the chunks myself so that I could build out this message. But LiteLLM provides a handy feature. All we need to do is save each chunk chunks.append(chunk), and then once we get a chunk with finish__reason can use their utility to reassemble the full message: final_message = litellm.stream_chunk_builder(chunks).
Then we add that message to our message history.
Once that's done, this is when you go and execute the functions it asked you to. You can choose to execute them one at a time or in parallel - it's your choice. But after getting all the output for the tool calls, you need to create a new message for each tool call output that looks like this:
tc_message = {
    "role": "tool",
    "content": fn_output,
    "tool_call_id": "call_XYZ", # This is where the call_id comes back into play
    "name": "my_function_name" # Not sure if this is absolutely necessary, but it worked for me and I think they included it in the LiteLLM docs
}This keeps our full message history in tact, including the LLM's chosen tool calls and the outputs from those calls.
As for streaming, I'm using websockets and came up with some custom message types to stream tools. The code below is not done yet, it's pretty messy while I was getting this figured out. But you can follow along and see how I stream things back. You can choose whether or not to stream back the function name first, then stream each argument chunk as it comes out.
# Definition of the ToolCall and ToolCallOutput objects I created:
class ToolCall(BaseModel):
    id: str
    name: str
    arguments: str
    def get_json(self):
        return {
            "type": "function",
            "id": self.id,
            "function": {
                "name": self.name,
                "arguments": self.arguments
            }
        }
        
class ToolCallOutput(ToolCall):
    response: str
    def get_json(self):
        return {
            "role": "tool",
            "tool_call_id": self.id,
            "name": self.name,
            "content": self.response
        }
async def stream_response(websocket: WebSocket, messages, model, message_id):
    """ Handles streaming while checking for stop messages. """
    run_another_completion = True
    try:
        while run_another_completion:
            run_another_completion = False
            response = await acompletion(
                model=model,
                messages=messages,
                stream=True,
                tools=tools,
                tool_choice="auto"
            )
            print("Streaming response now...")
            stop_requested = False  # Flag to track stop requests
            chunks = []
            async for chunk in response:
                #print(chunk)
                chunks.append(chunk)
                # **Check if a stop message is received**
                try:
                    stop_message = await asyncio.wait_for(websocket.receive_text(), timeout=0.01)  # Non-blocking check
                    stop_data = json.loads(stop_message)
                    if stop_data.get("type") == "stop":
                        print("\n=== Stop Requested During Streaming ===")
                        stop_requested = True
                        break
                except asyncio.TimeoutError:
                    pass  # No message received, continue streaming
                if chunk and chunk.choices[0].finish_reason:
                    break
                else:
                    if chunk.choices[0].delta.tool_calls:
                        fn_name = chunk.choices[0].delta.tool_calls[0].function.get("name")
                        call_id = chunk.choices[0].delta.tool_calls[0].id
                        # NOTE: Super annoying but with streaming, things like call_id are None after the first chunk until the end when we can reassemble the message
                        if fn_name and call_id:
                            chunk_message = json.dumps({
                                "type": "tool_call_init",
                                "name": fn_name,
                                "tool_call_id": call_id
                            })
                        else:
                            # After getting this first tool call init chunk, we don't want to send anything else until the end
                            chunk_message = {}
                    elif chunk and chunk.choices[0].delta.content:
                        chunk_content = chunk.choices[0].delta.content
                        chunk_message = json.dumps({"type": "chunk", "content": chunk_content, "message_id": message_id})
                    # Send the chunk to the client
                    if chunk_message:
                        await websocket.send_text(chunk_message)
                await asyncio.sleep(0)  # Allow the event loop to handle other tasks
            final_message = litellm.stream_chunk_builder(chunks)
            if final_message.choices[0].message.tool_calls:
                tool_call_list = []
                for tc in final_message.choices[0].message.tool_calls:
                    tool_call_list.append(ToolCall(
                        id=tc.id,
                        name=tc.function.name,
                        arguments=tc.function.arguments
                    ))
                tc_message = {
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [tc.get_json() for tc in tool_call_list]
                }
                # Attach this to the message history server-side
                messages.append(tc_message)
                # Send client this message object
                await websocket.send_text(json.dumps({
                    "type": "tool_call_message",
                    "message": tc_message
                }))
                # Now handle tool calls
                tool_responses = await handle_all_tool_calls(tool_call_list)
                for tr in tool_responses:
                    messages.append(tr.get_json())
                    await websocket.send_text(json.dumps({
                        "type": "tool_call_update",
                        "tool_call_id": tr.id,
                        "param_chunk": f"({tr.arguments}) - Done with tool call!"
                    }))
                    # Attach this to the message history server-side
                    await websocket.send_text(json.dumps({
                        "type": "tool_call_response_message",
                        "message": tr.get_json()
                    }))
                run_another_completion = True
            else:
                await websocket.send_text(json.dumps({
                    "type": "assistant_message",
                    "message": {
                        "role": "assistant",
                        "content": final_message.choices[0].message.content
                    }
                }))
        # **Send completion message unless stopped**
        if not stop_requested:
            await websocket.send_text(json.dumps({"type": "complete"}))
    except asyncio.CancelledError:
        print("\n=== Streaming Task Cancelled ===")
        await websocket.send_text(json.dumps({"type": "stopped", "content": "Streaming interrupted."}))
How did I figure this out?
Sometimes it's more valuable to understand how someone investigated and resolved an issue as opposed to just showing the final answer. So I wanted to share some high level details for how I figured out how this all works.
LiteLLM's documentation didn't lay out enough of the details that I could build this with little effort. In the code above, you'll notice I'm using acompletion which is the async version of LiteLLM's completion function. One thing I could have done to make this easier is to copy some of this code and switch to using the synchronous version completion to test outside of a FastAPI async endpoint. Because if I recall correctly, there were some timeout issues and other things that testing inside of an async API endpoint caused.
But overall, to figure out how tool call streaming worked required piecing together bits and pieces from LiteLLM's documentation on tool calling, some of OpenAI's documentation on tool calling (since their API basically set the standard for the rest), and using ipdb inside of my program to debug things and step through. You can use pdb, but ipdb is a bit friendlier to debug with. I think it's part of ipython which is a much nicer python interpreter to work in as opposed to the standard python one.
Some high level bullet points that outline my process for solving this (mostly in order):
- Update acompletionto force selecting a tool, that way the LLM will always call a tool while I'm testingacompletion(..., tool_choice="required", ...)- NOTE: I think this is obvious, but you should obviously give your LLM a tool that it can call. Specifying tool_choice="required"if the LLM has no tool calls available will probably throw an error.
 
- NOTE: I think this is obvious, but you should obviously give your LLM a tool that it can call. Specifying 
- Put a breakpoint before entering the chunking loop import ipdb; ipdb.set_trace()
- Inspect each chunk one at a time and notice that the first one outputs the nameandcall_idof the function to call. Then subsequent chunks output the arguments as streamed chunks. And then finally notice that the "final" chunk contains afinish_reason.- It didn't make sense to me that the first of a function call was the only one that contained a call ID. I thought each subsequent chunk would still have the call_idin it so that you know to which tool call the current arguments belong. For example, if an LLM picked multiple tools to call, is it possible that it could start outputting chunks for multiple different tool calls?- This was a dumb question because having a basic understand for how LLMs work, it can only stream one tool call at a time. Based on the way tool calls are streamed, I assumed that it would only ever output a tool call and its subsequent chunks one function at a time.
- Meaning it would say something like "I want to call function abcand here are the arguments for it: <args>. Next, I want to call functionxyzand here are the arguments for it" and so on.
 
 
- It didn't make sense to me that the first of a function call was the only one that contained a call ID. I thought each subsequent chunk would still have the 
- Write code to track the chunks and build up the function call with arguments myself.- I later found out that LiteLLM has a utility for this. All you need to do is save chunks in a list. Then once you get a finish_reason, you can call:litellm.stream_chunk_builder(chunks)and it will rebuild the entire message for you.
 
- I later found out that LiteLLM has a utility for this. All you need to do is save chunks in a list. Then once you get a 
response = acompletion(args...)
chunks = []
for chunk in response:
  chunks.append(chunk)
  # Check for finish_reason
  break
final_message = litellm.stream_chunk_builder(chunks)- Then I wasn't sure what to do. I didn't know how tool calling would fit in with chat/message history. I knew that simple messages looked like this:
[
  {"role": "system", "content": "<system prompt">},
  {"role": "user", "content": "this is my prompt"},
  {"role": "assistant", "content": "this is the llm's response to my prompt"},
  // etc...
]- But what do tool call messages look like?
- After lots of errors of the API complaining that I was screwing up the message history with tool calls, I realized the correct order of operations:- Tool call streaming completes
- Rebuild the assistant's tool call message using litellm.stream_chunk_builder(chunks), extract all the tool calls from the message (because there could be multiple), and add that to the message history with aroleof"assistant",contentshould beNone, andtool_callsshould be a list of the tool calls. (code for this has been shared above a few times)
- Then call all of the functions the LLM asked for and save the output
- Then generate another message for each individual tool call with roleas"tool",tool_call_idas the call ID that the LLM shared with us, andcontentas the tool call output. Append all of those to the message history
- So message history might look something like this:
 
[
  {"role": "system", "content": "<system prompt">},
  {"role": "user", "content": "this is my prompt"},
  {"role": "assistant", "content": None, "tool_calls": [
    // tool calls from the LLM...
  ]},
  {"role": "tool", "tool_call_id": "call_XYZ", content": "<tool output>"},
  {"role": "assistant": "content": "Based on the tool call output, ..."},
]I think that pretty much sums it up. Some SDK's and tools out there might help with this, but I thought it was interesting and useful to learn exactly how tool call streaming works. At least through LiteLLM and OpenAI. I'm not sure if it's different for other LLMs, but it might be.
