This article covers how to deal with OpenAI requests sometimes getting stuck for a very long time, which can drastically slow down your LLM applications and hinder user experience.
To solve this, we introduce a straightforward solution using the stream
API option together with the timeout
parameter so that these blocked requests can easily be detected and restarted.
We have implemented this approach for both synchronous and asynchronous execution.
Table of contents
Problem
To demonstrate the issue, we’ve run 1000 calls asynchronously through batches of 100 calls at a time and recorded the time it took for each batch to run.
These are the processing times in seconds (see Evaluation section for implementation details):
You can see that some batches took a very large amount of time, especially batch number 1 which took over 10 minutes to be processed.
This is because OpenAI’s Python client has a default timeout set to 600 seconds, meaning that it won’t throw a timeout error unless a request has not been processed within this time window.
Note: these times will of course differ each time you run the code, but the same issue should arise as soon as you start increasing the number of requests that you run
Solution
OpenAI also offers a streaming option in their API which returns the response by chunk, allowing you to start processing the model’s response as soon as the first token is available.
When using stream=True
together with timeout=X
, you can therefore stop the generation process and restart it whenever you still haven’t received the first token after X seconds.
Here are the results for the same problem as above using stream=True
and timeout=10
(see Evaluation section for implementation details):
You can see that a batch of 100 requests took 15sec at max to generate. This is due to the fact that those previously stuck requests were restarted after 10sec and were then able to go through.
Note: once again, running the same evaluation will result in different results, but even if the same request need to be restarted twice (which shouldn’t occur often) it will still take a significantly lower amount of time than without a timeout where a request can be running up to 10min before returning a response.
Implementation
This section dives into the code on how to actually implement this solution, both in a synchronous and asynchronous way.
Synchronous
from typing import Any
from openai import OpenAI
from pydantic import BaseModel
class Llm(BaseModel):
max_retries: int = 5
def model_post_init(self, __context: Any) -> None:
"""creates openai client after class is instantiated"""
self._client = OpenAI(max_retries=self.max_retries)
def run_completions(self, messages, model="gpt-3.5-turbo-1106", **kwargs) -> list:
"""runs completions synchronously"""
response=self._client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = self._parse_stream(response)
return response
def _parse_stream(self, stream):
"""parses stream response from completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response
def _parse_delta_content(self, delta, response):
if response["content"] is None:
response["content"] = ""
response["content"] += delta.content
def _parse_delta_tools(self, delta, response):
if response["tool_calls"] is None:
response["tool_calls"] = []
for tchunk in delta.tool_calls:
if len(response["tool_calls"]) <= tchunk.index:
response["tool_calls"].append(
{
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
)
if tchunk.id:
response["tool_calls"][tchunk.index]["id"] += tchunk.id
if tchunk.function.name:
response["tool_calls"][tchunk.index]["function"][
"name"
] += tchunk.function.name
if tchunk.function.arguments:
response["tool_calls"][tchunk.index]["function"][
"arguments"
] += tchunk.function.arguments
Note:
- When using
stream=True
the response needs to be parsed in a specific way. The parsing makes it difficult to reconstruct the ChatCompletion request in its entirety, so we instead store the result in a dictionary containing “content” and “tool_calls” as keys. - While parsing the content of the response is relatively straight forward, parsing responses using tool_calls is slightly more complex. This is implemented by the method
parse_delta_tools
.
Usage example (default timeout, no streaming):
# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)
llm = Llm(max_retries=1)
response = llm.run_completions(messages=[{"role":"user", "content":"hello"}])
print(response.choices[0].message.content)
Usage example (timeout=5seconds, no streaming):
# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)
llm = Llm(max_retries=1)
response = llm.run_completions(
messages=[{"role":"user", "content":"generate 100 random tokens"}],
timeout=5
)
print(response.choices[0].message.content)
It generally takes longer than 5 seconds for the model to generate 100 random tokens, therefore leading to a TimeoutError to be raised.
Note: before throwing the error, the OpenAI client will try and re-run the request based on max_retries
.
Usage example (timeout=5seconds, stream=True):
# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)
llm = Llm(max_retries=1)
response = llm.run_completions(
messages=[{"role":"user", "content":"generate 100 random tokens"}],
timeout=5,
stream=True
)
print(response["content"])
Even though the request took 21sec to be fully generated, the first token was returned within 5sec, meaning no timeout error was thrown as the request started being generated on time
Using this approach, requests that do not generate a first token within X sec will throw a timeout error and therefore automatically be restarted.
Asynchronous
The same as the above can also be implemented asynchronously, leading to a much greater performance gain in comparison to the synchronous implementation.
If you are not familiar with asynchronous programming, you can check our blog post Building asynchronous LLM applications in python.
The same class as before is reused, adding the run_batch_completions
method to run a batch of completions in an asynchronous way.
import asyncio
import logging
from typing import Any
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt
def log_retry(retry_state):
logging.info("Retrying %s: attempt #%s ended with: %s", retry_state.fn, retry_state.attempt_number, retry_state.outcome)
class Llm(BaseModel):
batch_size: int = 100
max_retries: int = 5
def model_post_init(self, __context: Any) -> None:
"""creates openai client after class is instantiated"""
self._client = OpenAI(max_retries=self.max_retries)
def run_completions(self, messages, model="gpt-3.5-turbo-1106", **kwargs) -> list:
"""runs completions synchronously"""
response=self._client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = self._parse_stream(response)
return response
def _parse_stream(self, stream):
"""parses stream response from completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response
def run_batch_completions(self, messages_list: list, **kwargs) -> list:
"""run completions by batch asynchronously"""
return asyncio.run(self._run_batch_completions(messages_list, **kwargs))
async def _run_batch_completions(self, messages_list: list, **kwargs) -> list:
"""runs completions by batch asynchronously"""
async with AsyncOpenAI(max_retries=self.max_retries) as client:
coroutines = [
self._run_async_completions(client, messages, **kwargs)
for messages in messages_list
]
responses = []
async for batch_responses in self._run_batches(coroutines):
responses.extend(batch_responses)
return responses
@retry(stop=stop_after_attempt(3), after=log_retry)
async def _run_async_completions(self, client, messages, model="gpt-3.5-turbo-1106", **kwargs):
"""runs completions asynchronously"""
response = await client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = await self._parse_async_stream(response)
return response
async def _parse_async_stream(self, stream):
"""parses stream response from async completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
async for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response
async def _run_batches(self, coroutines: list):
for batch in self._batches(coroutines, self.batch_size):
yield await asyncio.gather(*batch)
def _batches(self, items, batch_size):
for i in range(0, len(items), batch_size):
yield items[i : i + batch_size]
def _parse_delta_content(self, delta, response):
if response["content"] is None:
response["content"] = ""
response["content"] += delta.content
def _parse_delta_tools(self, delta, response):
if response["tool_calls"] is None:
response["tool_calls"] = []
for tchunk in delta.tool_calls:
if len(response["tool_calls"]) <= tchunk.index:
response["tool_calls"].append(
{
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
)
if tchunk.id:
response["tool_calls"][tchunk.index]["id"] += tchunk.id
if tchunk.function.name:
response["tool_calls"][tchunk.index]["function"][
"name"
] += tchunk.function.name
if tchunk.function.arguments:
response["tool_calls"][tchunk.index]["function"][
"arguments"
] += tchunk.function.arguments
Notes:
- The AsyncClient is instantiated for each
run_batch_completions
as using the same async client too many times throughout the application leads to issues. - A
@retry
decorator is added to re-run a request in case some unexpected issues arise. This may occur while processing a large amount of asynchronous calls at the same time, causing issues due to large amount of connections being open. This can also be solved by diminishing the batch_size but doing this might slow down the overall processing time.
Usage example (no timeout, no stream):
# asking an LLM to generate one random token, 100 times
messages_list = []
for _ in range(100):
messages_list.append([{"role": "user", "content": f"Write 1 random tokens"}])
llm = Llm()
responses = llm.run_batch_completions(messages_list=messages_list)
The above code runs 100 requests asynchronously, asking the LLM to generate one random token.
Usage example (timeout=10, stream=True):
llm = Llm()
responses = llm.run_batch_completions(messages_list=messages_list, timeout=10, stream=True)
Evaluation
In order to evaluate the performance gain of using a timeout
and stream=True
, we run 10 times the previous 100 requests (i.e. 1000 requests) and record how long it takes to record it:
- No timeout, no streaming
import timeit
llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")
Total: 13 minutes 10 seconds.
2. Timeout=10, stream=True
import timeit
llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list, timeout=10, stream=True),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")
image-20240206-112514.png
Total: 1 minute 39 seconds (8x faster compared to no timeout).
3. Timeout=20, stream=True
import timeit
llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list, timeout=20, stream=True),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")
Total: 2 minutes 42 seconds (5x faster compared to no timeout).
Conclusion
Using timeout
and stream
options from the OpenAI API client can significantly save you time, especially when processing a large amount of requests in an asynchronous way.
In the evaluation we performed, using a timeout of 10 seconds led to a processing time around 8x faster that of using no timeout.
Nevertheless, keep in mind that the gain in performance will vary according to how busy the OpenAI servers are, and whether they improve their delivery of requests in the near future.