Speeding up your OpenAI LLM applications

Diverger
8 min readFeb 8, 2024

--

by Manuel Renner

This article covers how to deal with OpenAI requests sometimes getting stuck for a very long time, which can drastically slow down your LLM applications and hinder user experience.

To solve this, we introduce a straightforward solution using the stream API option together with the timeout parameter so that these blocked requests can easily be detected and restarted.

We have implemented this approach for both synchronous and asynchronous execution.

Table of contents

Problem

To demonstrate the issue, we’ve run 1000 calls asynchronously through batches of 100 calls at a time and recorded the time it took for each batch to run.

These are the processing times in seconds (see Evaluation section for implementation details):

You can see that some batches took a very large amount of time, especially batch number 1 which took over 10 minutes to be processed.

This is because OpenAI’s Python client has a default timeout set to 600 seconds, meaning that it won’t throw a timeout error unless a request has not been processed within this time window.

Note: these times will of course differ each time you run the code, but the same issue should arise as soon as you start increasing the number of requests that you run

Solution

OpenAI also offers a streaming option in their API which returns the response by chunk, allowing you to start processing the model’s response as soon as the first token is available.

When using stream=True together with timeout=X, you can therefore stop the generation process and restart it whenever you still haven’t received the first token after X seconds.

Here are the results for the same problem as above using stream=True and timeout=10 (see Evaluation section for implementation details):

You can see that a batch of 100 requests took 15sec at max to generate. This is due to the fact that those previously stuck requests were restarted after 10sec and were then able to go through.

Note: once again, running the same evaluation will result in different results, but even if the same request need to be restarted twice (which shouldn’t occur often) it will still take a significantly lower amount of time than without a timeout where a request can be running up to 10min before returning a response.

Implementation

This section dives into the code on how to actually implement this solution, both in a synchronous and asynchronous way.

Synchronous

from typing import Any

from openai import OpenAI
from pydantic import BaseModel

class Llm(BaseModel):
max_retries: int = 5

def model_post_init(self, __context: Any) -> None:
"""creates openai client after class is instantiated"""
self._client = OpenAI(max_retries=self.max_retries)

def run_completions(self, messages, model="gpt-3.5-turbo-1106", **kwargs) -> list:
"""runs completions synchronously"""
response=self._client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = self._parse_stream(response)
return response

def _parse_stream(self, stream):
"""parses stream response from completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response

def _parse_delta_content(self, delta, response):
if response["content"] is None:
response["content"] = ""

response["content"] += delta.content

def _parse_delta_tools(self, delta, response):
if response["tool_calls"] is None:
response["tool_calls"] = []

for tchunk in delta.tool_calls:
if len(response["tool_calls"]) <= tchunk.index:
response["tool_calls"].append(
{
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
)
if tchunk.id:
response["tool_calls"][tchunk.index]["id"] += tchunk.id
if tchunk.function.name:
response["tool_calls"][tchunk.index]["function"][
"name"
] += tchunk.function.name
if tchunk.function.arguments:
response["tool_calls"][tchunk.index]["function"][
"arguments"
] += tchunk.function.arguments

Note:

  • When using stream=True the response needs to be parsed in a specific way. The parsing makes it difficult to reconstruct the ChatCompletion request in its entirety, so we instead store the result in a dictionary containing “content” and “tool_calls” as keys.
  • While parsing the content of the response is relatively straight forward, parsing responses using tool_calls is slightly more complex. This is implemented by the method parse_delta_tools.

Usage example (default timeout, no streaming):

# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)

llm = Llm(max_retries=1)
response = llm.run_completions(messages=[{"role":"user", "content":"hello"}])
print(response.choices[0].message.content)

Usage example (timeout=5seconds, no streaming):

# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)

llm = Llm(max_retries=1)
response = llm.run_completions(
messages=[{"role":"user", "content":"generate 100 random tokens"}],
timeout=5
)
print(response.choices[0].message.content)

It generally takes longer than 5 seconds for the model to generate 100 random tokens, therefore leading to a TimeoutError to be raised.

Note: before throwing the error, the OpenAI client will try and re-run the request based on max_retries.

Usage example (timeout=5seconds, stream=True):

# setting up logging to see the requests info
import logging
logging.basicConfig(level=logging.INFO)

llm = Llm(max_retries=1)
response = llm.run_completions(
messages=[{"role":"user", "content":"generate 100 random tokens"}],
timeout=5,
stream=True
)
print(response["content"])

Even though the request took 21sec to be fully generated, the first token was returned within 5sec, meaning no timeout error was thrown as the request started being generated on time

Using this approach, requests that do not generate a first token within X sec will throw a timeout error and therefore automatically be restarted.

Asynchronous

The same as the above can also be implemented asynchronously, leading to a much greater performance gain in comparison to the synchronous implementation.

If you are not familiar with asynchronous programming, you can check our blog post Building asynchronous LLM applications in python.

The same class as before is reused, adding the run_batch_completions method to run a batch of completions in an asynchronous way.

import asyncio
import logging
from typing import Any

from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel
from tenacity import retry, stop_after_attempt

def log_retry(retry_state):
logging.info("Retrying %s: attempt #%s ended with: %s", retry_state.fn, retry_state.attempt_number, retry_state.outcome)

class Llm(BaseModel):
batch_size: int = 100
max_retries: int = 5

def model_post_init(self, __context: Any) -> None:
"""creates openai client after class is instantiated"""
self._client = OpenAI(max_retries=self.max_retries)

def run_completions(self, messages, model="gpt-3.5-turbo-1106", **kwargs) -> list:
"""runs completions synchronously"""
response=self._client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = self._parse_stream(response)
return response

def _parse_stream(self, stream):
"""parses stream response from completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response

def run_batch_completions(self, messages_list: list, **kwargs) -> list:
"""run completions by batch asynchronously"""
return asyncio.run(self._run_batch_completions(messages_list, **kwargs))

async def _run_batch_completions(self, messages_list: list, **kwargs) -> list:
"""runs completions by batch asynchronously"""
async with AsyncOpenAI(max_retries=self.max_retries) as client:
coroutines = [
self._run_async_completions(client, messages, **kwargs)
for messages in messages_list
]
responses = []
async for batch_responses in self._run_batches(coroutines):
responses.extend(batch_responses)
return responses

@retry(stop=stop_after_attempt(3), after=log_retry)
async def _run_async_completions(self, client, messages, model="gpt-3.5-turbo-1106", **kwargs):
"""runs completions asynchronously"""
response = await client.chat.completions.create(
messages=messages, model=model, **kwargs
)
if "stream" in kwargs and kwargs["stream"]:
response = await self._parse_async_stream(response)
return response

async def _parse_async_stream(self, stream):
"""parses stream response from async completions"""
response = {"role": "assistant", "content": None, "tool_calls": None}
async for chunk in stream:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._parse_delta_content(choice.delta, response)
elif choice.delta and choice.delta.tool_calls:
self._parse_delta_tools(choice.delta, response)
return response

async def _run_batches(self, coroutines: list):
for batch in self._batches(coroutines, self.batch_size):
yield await asyncio.gather(*batch)

def _batches(self, items, batch_size):
for i in range(0, len(items), batch_size):
yield items[i : i + batch_size]

def _parse_delta_content(self, delta, response):
if response["content"] is None:
response["content"] = ""

response["content"] += delta.content

def _parse_delta_tools(self, delta, response):
if response["tool_calls"] is None:
response["tool_calls"] = []

for tchunk in delta.tool_calls:
if len(response["tool_calls"]) <= tchunk.index:
response["tool_calls"].append(
{
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
)
if tchunk.id:
response["tool_calls"][tchunk.index]["id"] += tchunk.id
if tchunk.function.name:
response["tool_calls"][tchunk.index]["function"][
"name"
] += tchunk.function.name
if tchunk.function.arguments:
response["tool_calls"][tchunk.index]["function"][
"arguments"
] += tchunk.function.arguments

Notes:

  • The AsyncClient is instantiated for each run_batch_completions as using the same async client too many times throughout the application leads to issues.
  • A @retry decorator is added to re-run a request in case some unexpected issues arise. This may occur while processing a large amount of asynchronous calls at the same time, causing issues due to large amount of connections being open. This can also be solved by diminishing the batch_size but doing this might slow down the overall processing time.

Usage example (no timeout, no stream):

# asking an LLM to generate one random token, 100 times
messages_list = []
for _ in range(100):
messages_list.append([{"role": "user", "content": f"Write 1 random tokens"}])

llm = Llm()
responses = llm.run_batch_completions(messages_list=messages_list)

The above code runs 100 requests asynchronously, asking the LLM to generate one random token.

Usage example (timeout=10, stream=True):

llm = Llm()
responses = llm.run_batch_completions(messages_list=messages_list, timeout=10, stream=True)

Evaluation

In order to evaluate the performance gain of using a timeout and stream=True , we run 10 times the previous 100 requests (i.e. 1000 requests) and record how long it takes to record it:

  1. No timeout, no streaming
import timeit

llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")

Total: 13 minutes 10 seconds.

2. Timeout=10, stream=True

import timeit

llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list, timeout=10, stream=True),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")
image-20240206-112514.png

Total: 1 minute 39 seconds (8x faster compared to no timeout).

3. Timeout=20, stream=True

import timeit

llm = Llm()
for i in range(1,11):
processing_time = timeit.timeit(
lambda: llm.run_batch_completions(messages_list, timeout=20, stream=True),
number=1
)
print(f"Batch nr {i} took {processing_time:.2f} seconds")

Total: 2 minutes 42 seconds (5x faster compared to no timeout).

Conclusion

Using timeout and stream options from the OpenAI API client can significantly save you time, especially when processing a large amount of requests in an asynchronous way.

In the evaluation we performed, using a timeout of 10 seconds led to a processing time around 8x faster that of using no timeout.

Nevertheless, keep in mind that the gain in performance will vary according to how busy the OpenAI servers are, and whether they improve their delivery of requests in the near future.

--

--

Diverger
Diverger

Written by Diverger

Inteligencia Artificial Generativa aplicada para los profesionales de la información y para el desarrollo de software.

No responses yet