Building asynchronous LLM applications in python

Diverger
6 min readJan 24, 2024

--

by Manuel Renner

Dall-E generated image of a man programming

In this article, we’ll delve into the details of how to query LLM endpoints asynchronously to increase the performance and robustness of your LLM applications. We start with the basics of asynchronous programming and its use case in sending requests to LLMs before moving on to more advanced concepts such as handling errors with tenacity and batching requests for efficiency. The examples we use are focused on querying the OpenAI API endpoints, but these can easily be adapted to other providers or self-deployed endpoints using the httpx examples below.

Table of contents

What is asynchronous programming?

In traditional synchronous programming, tasks are executed sequentially, blocking the main program flow between executions and creating performance bottlenecks when dealing with time-consuming operations.

Asynchronous programming, on the other hand, allows developers to initiate tasks and continue with other operations while waiting for the asynchronous tasks to complete. This non-blocking approach enhances the efficiency and responsiveness of an application, making it more scalable as it becomes capable of handling large numbers of operations simultaneously.

asyncio

The asyncio python library has been designed to simplify asynchronous programming by providing a framework for managing concurrent operations efficiently. There are plenty of resources online about the library so I won't go into detail, but here are two common ways of running asynchronous programs:

Using asyncio.gather()

The gather() function in asyncio is a convenient way to execute multiple asynchronous tasks concurrently and collect their results once all tasks have finished. For example, to concurrently fetch data from two different URLs, you can use gather() as follows:

import asyncio

async def fetch_data(url):
# Simulate fetching data from a URL
# (In a real scenario, this would involve asynchronous I/O operations)
return f"Data from {url}"

async def main():
results = await asyncio.gather(fetch_data('url1'), fetch_data('url2'))
print(results)

asyncio.run(main())

NOTES:

  • asynchronous functions always start with async def and use the keyword await which pauses the execution of the function until the awaited asynchronous operation completes.
  • asyncio.run() is a straightforward way to execute the top-level entry point of an asyncio program.
  • If you are running this code inside a jupyter notebook, run the following code beforehand:
import nest_asyncio
nest_asyncio.apply()

Using asyncio.completed()

The as_completed() function is handy for handling tasks as they are completed, allowing you to process results as soon as they are available. Here's the previous example using as_completed():

import asyncio

async def fetch_data(url):
# Simulate fetching data from a URL
return f"Data from {url}"

async def main():
urls = ['url1', 'url2', 'url3']
tasks = [fetch_data(url) for url in urls]

for completed_task in asyncio.as_completed(tasks):
result = await completed_task
print(result)

asyncio.run(main())

Tasks are now printed as soon as each one of them is completed.

OpenAI asynchronous client

Let’s now put this into practice using the OpenAI python client. The openai library provides an asynchronous client AsyncOpenAI. Here is an example on how to run multiple requests onto the chat completions API using asyncio.as_completed():

import asyncio
from openai import AsyncOpenAI

async def chat_completion(client, prompt, **kwargs):
messages = [{"role": "user", "content": prompt}]
return await client.chat.completions.create(
messages=messages,
**kwargs
)

async def run_chat_completions(client, prompts: str, **kwargs):
calls = [chat_completion(client, prompt, **kwargs) for prompt in prompts]
for completed_task in asyncio.as_completed(calls):
response = await completed_task
print(response)

async def main(prompts, **kwargs):
async with AsyncOpenAI() as client:
await run_chat_completions(client, prompts, **kwargs)

prompts = ["Hello"] * 10
asyncio.run(main(prompts, model="gpt-3.5-turbo-1106"))

In the above example, the prompt “Hello” is sent 10 times to the GPT-3.5 model and each response is printed as it gets completed. This is significantly faster than running them sequentially.

NOTES:

  • using asynchronous context manager async with AsyncOpenAI() ensures that the client is automatically closed when it’s no longer needed, preventing resource leaks from occurring.
  • you will need to set your OPENAI_API_KEY as an environment variable for the above example to run without error.

httpx asynchronous client

Instead of using the OpenAI client, the asynchronous HTTP client from the httpx library can be used to query endpoints from providers other than OpenAI.

For simplicity’s sake, the same example as above is reproduced using OpenAI as provider:

import os
import asyncio
from httpx import AsyncClient

async def http_request(client, prompt, **kwargs):
json_data = {
"messages": [
{"role": "user", "content": prompt},
],
}
json_data.update(kwargs)
return await client.post(
url="https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"},
json=json_data,
)

async def run_http_requests(client, prompts: str, **kwargs):
calls = [http_request(client, prompt, **kwargs) for prompt in prompts]
for completed_task in asyncio.as_completed(calls):
response = await completed_task
print(response)

async def main(prompts, **kwargs):
async with AsyncClient(timeout=60) as client:
await run_http_requests(client, prompts, **kwargs)

prompts = ["Hello"] * 10
asyncio.run(main(prompts, model="gpt-3.5-turbo-1106"))

The code is extremely similar to the previous one, with the difference that the httpx Asynclient client is used and the OpenAI API arguments are passed as part of the request’s json_data.

NOTE: by default the AsyncClient has a timeout of 5sec, which is too little for most LLM calls as they tend to take longer than this to be completed. To tackle this, the OpenAI’s Streaming API could be used in order to return the chunk of a response as soon as available, but this is outside of the scope of this blog. For simplicity, increasing timeout = 60 lets the client wait up to 60 sec before throwing a timeout error which is enough time for most requests to complete.

Batching requests

Running an indefinitely large amount of tasks at the same time can be detrimental to the performance of your program due to the over-usage of its resources. To prevent this, tasks can be run by batches, meaning that only a subset of tasks with len() == batch_size is run at once. This can be implemented as part of a generator function:

def generate_batches(calls, batch_size):
for i in range(0, len(calls), batch_size):
yield calls[i:i + batch_size]

which can be used before running asyncio.as_completed() or asyncio.gather():

async def run_http_requests(client, prompts: str, **kwargs):
calls = [http_request(client, prompt, **kwargs) for prompt in prompts]
for batch in generate_batches(calls, 5):
print("Starting batch")
for completed_task in asyncio.as_completed(batch):
response = await completed_task
print(response)

async def main(prompts, **kwargs):
async with AsyncClient(timeout=60) as client:
await run_http_requests(client, prompts, **kwargs)

prompts = ["Hello"] * 10
asyncio.run(main(prompts, model="gpt-3.5-turbo-1106"))

This code will run the 10 requests in two batches, meaning it will wait for the first 5 prompts to finish running the next 5.

While this might slow things down in this simple example, as the application scale, running 100s of complex prompts at the same time will put too many constraints on the available resources and lead to performance issues. Batching these requests will help prevent that.

Handling errors with tenacity

Last but not least, errors are likely to occur when calling LLM endpoints, and these errors need to be handled. tenacity is a handy python library for handling API requests, allowing us to automatically re-run a request after it fails.

I won’t cover the library in detail here but some important elements included here are the following:

  • @retry: decorator that automatically retries the function if it raises an exception.
  • wait=wait_exponential(...): sets the wait time between retries to increase exponentially
  • retry=retry_if_exception_type(...): specifies that the function should be retried only if it raises one of the specified types of exceptions.

Let’s consider adding this to our original chat completion example:

import asyncio
from openai import AsyncOpenAI, APITimeoutError, InternalServerError, RateLimitError, UnprocessableEntityError
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=2, max=5),
retry=retry_if_exception_type(
(
APITimeoutError,
InternalServerError,
RateLimitError,
UnprocessableEntityError
)
)
)
async def chat_completion(client, prompt, **kwargs):
messages = [{"role": "user", "content": prompt}]
return await client.chat.completions.create(
messages=messages,
**kwargs
)

This code will re-run any requests that raise one of the aforementioned errors (see openai errors for more detail) up to three times, increasing the waiting time for each iteration exponentially starting from 2sec up to 5sec.

The same can be achieved for httpx using errors/exceptions such as HTTPStatusError, NetworkError, TimeoutException, etc.

Summary

Using asynchronous programming as part of your application is made very simple thanks to asyncio. You can use it together with the openai or httpx library to query LLM endpoints in a simple yet performant manner. Nevertheless, building asynchronous applications requires you to focus on other aspects of your code such as how many requests can run at the same time and how to handle errors as they may arise. Applying all of the above to your application may take time and add some level of complexity, but it can significantly boost its performance and scalability.

I hope you found this article insightful. Feel free to comment if you have any questions. Happy coding!

--

--

Diverger
Diverger

Written by Diverger

Inteligencia Artificial Generativa aplicada para los profesionales de la información y para el desarrollo de software.

No responses yet