Советы

Дедубликация: как OpenAI и FastAPI спасут Habr от дублей

До того, как у меня появилась первая настоящая работа, большинство проектов, над которыми я работал, были предназначены только для меня, например, созданное мной “приложение” для чата, которым пользовался только я и моя семья.

Там не было других разработчиков, с которыми я работал, и никого не волновало, насколько удобен в обслуживании мой код. Однако в “реальном мире” вы довольно быстро узнаете, как важно писать легко тестируемый код.

Одной из первых концепций, которую я изучил на своей первой работе, была инъекция зависимостей. В этом посте мы рассмотрим, что это такое и как его эффективно использовать, используя зависимости FastAPI.

Что такое внедрение зависимостей?

Инъекция зависимостей – это причудливый способ сказать “функции/объекты должны иметь переменные, от которых они зависят, переданные в них, вместо того, чтобы конструировать их самостоятельно”. Чтобы лучше понять это, давайте рассмотрим пример:

from datetime import date
def get_formatted_date(): return date.today().strftime(«%b %d, %Y»)

Эта функция кажется достаточно простой, но что если мы захотим ее проверить? На самом деле это немного сложно проверить, потому что дата постоянно меняется. Мы могли бы сами получить дату и использовать ее для сравнения, но тогда мы фактически переписываем функцию.

Что если мы перепишем нашу функцию следующим образом:

from datetime import date class Clock: def today(self): return date.today() def get_formatted_date(clock): return clock.today().strftime(«%b %d, %Y»)

Это выглядит очень похоже, но функция, которую мы тестируем, теперь принимает часы. Главное преимущество здесь в том, что мы можем создать собственные часы для тестирования (обычно их называют макетами, или в данном случае MockClock).

class MockClock: def __init__(self, year, month, day): self.year = year self.month = month self.day = day def today(self): return date(self.year, self.month, self.day) def test_get_formatted_date(): assert «Jan 01, 2021» == get_formatted_date(MockClock(2021, 1, 1)) assert «Apr 18, 2021» == get_formatted_date(MockClock(2021, 4, 18)) assert «May 19, 1999» == get_formatted_date(MockClock(1999, 5, 19))

И теперь мы можем легко протестировать нашу функцию. В данном случае часы были зависимостью, которая была внедрена в нашу функцию get_formatted_date.

Подождите… разве это не переводит проблему в другое место?

Да. Вам все еще нужно сконструировать Часы и передать их в ваши функции.

Обычно, однако, существуют фреймворки, которые сделают все это за вас. Если вы пользовались pytest, вы можете быть знакомы с их фикстурами, которые являются еще одной формой инъекции зависимостей. Вот фрагмент кода из документации по fixtures:

import pytest @pytest.fixture
def first_entry(): return «a» @pytest.fixture
def order(first_entry): return [first_entry] def test_string(order): order.append(«b») assert order == [«a», «b»]

Видно, что просто добавив аргумент, соответствующий названию фикстуры (order и first_entry), pytest автоматически инжектирует возвращаемое значение этого метода в нашу функцию. Хорошие фреймворки для инъекции зависимостей не заставят вас писать слишком много кода “склеить все вместе”.

Зависимости FastAPI

FastAPI имеет свою собственную инъекцию зависимостей, встроенную во фреймворк. Давайте рассмотрим первый пример из их документации:

async def common_parameters( q: Union[str, None] = None, skip: int = 0, limit: int = 100
): return {«q»: q, «skip»: skip, «limit»: limit} @app.get(«/items/»)
async def read_items(commons: dict = Depends(common_parameters)): return commons @app.get(«/users/»)
async def read_users(commons: dict = Depends(common_parameters)): return commons

Казалось бы, все просто: вы создаете функции, которые могут быть вставлены в любой маршрут. Теперь эти два маршрута будут принимать q, skip и limit в качестве параметров запроса.

Объединение некоторых параметров запроса – это хорошо, но мы можем сделать гораздо больше. Мы начнем с этого примера маршрута:

from typing import Union from fastapi import FastAPI, Header, HTTPException app = FastAPI() @app.get(«/whoami»)
async def who_am_i(x_api_key: Union[str, None] = Header(default=None), ): if x_api_key is None: raise HTTPException(status_code=401) # See https://fastapi.tiangolo.com/tutorial/sql-databases/ for details on SessionLocal
db = SessionLocal() try: api_key = lookup_api_key(db, x_api_key) if api_key is None: raise HTTPException(status_code=401) return {«user»: api_key.user_id} finally: db.close()

Этот маршрут принимает API-ключ через заголовок X-Api-Key, выполняет поиск в базе данных, чтобы убедиться, что это действительный ключ, а затем возвращает пользователя, которому принадлежит этот ключ. Если ключ недействителен или отсутствует, возвращается сообщение 401.

Давайте очистим это с помощью зависимостей:

# This is recommended from FastAPI's docs
# By yielding, the request continues and uses the DB
# It'll only be close when the request is finished
def get_db(): db = SessionLocal() try: yield db finally: db.close() # Dependencies can take in other dependencies, so we take in the DB
# We also take in the header, and do the same lookups as before
def user_id_from_api_key(db: Session = Depends(get_db), x_api_key: Union[str, None] = Header(default=None)): if x_api_key is None: raise HTTPException(status_code=401) api_key = lookup_api_key(db, x_api_key) if api_key is None: raise HTTPException(status_code=401) return api_key.user_id

Теперь, когда мы определили зависимость, наш маршрут становится очень простым:

@app.get(«/whoami»)
async def who_am_i(user_id: str = Depends(user_id_from_api_key)): return {«user»: user_id}

Под капотом этот маршрут проверяет, чтобы убедиться, что только пользователи с действительными ключами API могут использовать эту конечную точку. Любые другие маршруты, которые мы хотим защитить, мы можем сделать это, просто добавив этот единственный аргумент.

Поскольку FastAPI хорошо спроектирован, он знает, что для этого запроса нужен заголовок X-Api-Key, и добавляет его в спецификацию OpenAPI для этого маршрута:

Тестирование зависимостей FastAPI

Когда мы говорили об инъекции зависимостей в начале, мы говорили о том, что это делает наш код чистым и более легким для тестирования. До сих пор мы говорили только о том, как сделать наш код FastAPI чище.

Оказывается, в FastAPI есть поддержка переопределения зависимостей при тестировании. Это позволяет нам указать макетную версию зависимостей, которая будет использоваться во всем нашем проекте. Для примера с ключом API в тестах мы можем просто ввести определенный user_id и не беспокоиться о настройке базы данных каждый раз.

Резюме

Инъекция зависимостей позволяет нам структурировать наш код таким образом, чтобы его было легко поддерживать и легко тестировать.

Используя такой фреймворк, как FastAPI, вы можете делать такие вещи, как извлечение и проверка пользователя в одной строке кода. Его можно повторно использовать для любого маршрута и легко высмеивать в тестах.

В PropelAuth, как пример, мы использовали зависимости в нашей библиотеке FastAPI для инкапсуляции всей нашей логики авторизации, так что разработчику достаточно добавить одну строчку кода, а о проверке пользователей/организаций позаботятся все.

Как начать работать с FastAPI на Python

Когда веб-фреймворки Python, такие как Flask и Django, впервые появились и начали приобретать популярность и известность, Python был несколько другим языком, чем сегодня.

Многие элементы современного Python, такие как асинхронное выполнение и стандарт ASGI (Asynchronous Server Gateway Interface), на тот момент либо находились в зачаточном состоянии, либо еще не существовали. Поэтому не удивительно, что сейчас возникают новые фреймворки.

Один из них — FastAPI. В этой статье мы разберем, как начать работать с FastAPI на Python.

FastAPI – это веб-фреймворк Python, который изначально создавался для использования современных функций Python. Он использует стандарт ASGI для асинхронного, конкурентного соединения с клиентами.

При необходимости также может работать с WSGI. Асинхронные функции могут использоваться для маршрутов и конечных точек.

Кроме того, FastAPI позволяет быстро и эффективно создавать веб-приложения с чистым, современным Python-кодом, с подсказками типов.

Как следует из названия, основным вариантом использования FastAPI является создание конечных точек API. Для этого нужно просто вернуть данные словаря Python в формате JSON или использовать стандарт OpenAPI, включая интерактивный Swagger UI.

Но FastAPI никоим образом не ограничивается API. С его помощью можно сделать практически все, что делают с использованием веб-фреймворков. От доставки простеньких старых веб-страниц с применением движка шаблонов Jinja2 до обслуживания приложений на базе WebSockets.

Читайте также:  Руководство по Python для начинающих

Установка FastAPI

FastAPI может устанавливать довольно много компонентов самостоятельно. Поэтому лучше начинать любой проект на FastAPI в новой чистой виртуальной среде.

Основные компоненты FastAPI можно установить с помощью команды pip install fastapi.

Вам также потребуется установить сервер ASGI для локального тестирования.

FastAPI хорошо работает с Uvicorn, поэтому мы будем использовать его в наших примерах. Для установки Uvicorn с оптимальным набором компонентов с библиотеками введите команду pip install uvicorn [standard]. А чтобы установить минимальную версию на чистом Python, используйте команду pip install uvicorn.

Простой пример использования FastAPI

Простое приложение на FastAPI выглядит следующим образом:

from fastapi import FastAPI
app = FastAPI()

@app.get(«/»)
async def root():
return {«greeting»:»Hello world»}

Сохраните этот код в файле main.py, затем запустите из своего виртуального окружения с помощью команды uvicorn main: app. Объект app – это то, что вы будете использовать для своего сервера ASGI. Обратите внимание, что вы также можете использовать WSGI с адаптером ASGI-to-WSGI. Однако лучше всего использовать ASGI.

Когда все заработает, перейдите по ссылке http:\localhost:8000 (дефолтный адрес для тестового сервера Uvicorn на вашей локальной машине). В браузере вы увидите {«greeting»: «Hello world»} – валидный ответ JSON, сгенерированный из словаря.

Надеемся, этот пример даст вам представление о том, насколько легко FastAPI позволяет передавать JSON. Все, что нужно сделать, это создать маршрут и вернуть словарь Python, который будет автоматически сериализован в JSON.

Для сериализации сложных типов данных нужно предпринять дополнительные шаги, но о них мы поговорим позже.

[python_ad_block]

Общие принципы приложения FastAPI должны быть знакомы любому, кто работал с таким фреймворком, как Flask:

  • Объект app импортируется на сервер ASGI или WSGI и используется для запуска приложения
  • Добавление маршрутов в приложение можно выполнить с помощью декоратора. Например, @app.get («/») создает маршрут метода GET в корне сайта с результатами, возвращаемыми обернутой функцией.

Но всё же у FastAPI есть свои особенности. Во-первых, функции маршрутизации могут быть асинхронными. Поэтому любые развертываемые асинхронные компоненты – например, соединение промежуточного программного обеспечения асинхронной базы данных – также могут работать в этих функциях.

Обратите внимание, что ничто не мешает вам использовать обычные функции синхронизации, если они нужны.

Допустим, у вас есть операция, требующая больших вычислительных ресурсов (в отличие от операции, ожидающей ввода-вывода — что является лучшим вариантом использования асинхронного режима).

В таком случае будет лучше использовать функцию синхронизации и позволить FastAPI разобраться с ней. В остальных случаях используйте асинхронность.

Типы маршрутов в FastAPI

Декоратор @app позволяет устанавливать метод, используемый для маршрута, например, @app.get или @app.post. Таким образом поддерживаются методы GET, POST, PUT, DELETE и менее используемые OPTIONS, HEAD, PATCH, TRACE.

Вы также можете поддерживать несколько методов на заданном маршруте. Для этого нужно просто обернуть несколько функций маршрутизации, например @app.get («/») для одной функции и @app.post («/») для другой.

Параметры путей, запросов и форм

Если вы хотите извлечь переменные из пути маршрута, это можно сделать, определив их при объявлении маршрута, а затем передав их в функцию маршрута.

@app.get(«/users/{user_id}»)
async def user(user_id: str):
return {«user_id»:user_id}

Чтобы извлечь параметры запроса из URL-адреса, можно использовать типизированные объявления в функции маршрутизации, которые FastAPI обнаружит автоматически:

userlist = [«Spike»,»Jet»,»Ed»,»Faye»,»Ein»]

@app.get(«/userlist»)
async def userlist_(start: int = 0, limit: int = 10):
return userlist[start:start+limit]

Обработка данных формы происходит немного сложнее. Во-первых, вам нужно установить дополнительную библиотеку python-multipart для анализа данных формы (pip install python-multipart). Затем вы сможете использовать синтаксис, аналогичный синтаксису параметров запроса, но с некоторыми изменениями:

from fastapi import Form

@app.post(«/lookup»)
async def userlookup(username: str = Form(…), user_id: str = Form(«»)):
return {«username»: username, «user_id»:user_id}

Объект Form извлекает именованный параметр (username, user_id) из отправленной формы и передает его. Обратите внимание: если вы используете Form(…) при объявлении, это намек на то, что рассматриваемый параметр является обязательным, как username в данном примере. Для необязательного элемента Form() можно передать значение по умолчанию, как в случае с user_id.

Типы ответов в FastAPI

Типом ответа по умолчанию для FastAPI является JSON. Все примеры, которые мы видели до сих пор, возвращают данные, которые автоматически сериализуются как JSON. Но вы можете возвращать данные и в другом формате.

Например:

from fastapi.responses import HTMLResponse

@app.get(«/»)
def root():
return HTMLResponse(«Hello world«)

Модуль fastapi.responses поддерживает множество распространенных типов ответов:

  • HTMLResponse или PlainTextResponse – возвращает текст как HTML или обычный текст
  • RedirectResponse – перенаправляет на указанный URL
  • FileResponse – возвращает файл по указанному пути в асинхронной потоковой передаче
  • StreamingResponse – принимает генератор и передает результаты клиенту

Кроме того, вы можете использовать общий объект Response и предоставить собственный настраиваемый код состояния, заголовки, контент и тип мультимедиа.

Объект Response в FastAPI

Обратите внимание: если вы хотите работать с Response, например, путем установки файлов cookie или заголовков, вы можете сделать это, приняв объект Response в качестве параметра в своей функции маршрутизации.

from fastapi import Response

@app.get(«/»)
def modify_header(response:Response):
response.headers[«X-New-Header»] = «Hi, I'm a new header!
return {«msg»:»Headers modified in response»}

Файлы cookie в FastAPI

Получение файлов cookie от клиента работает примерно так же, как обработка параметров запроса или формы:

from fastapi import Cookie

@app.get(«/»)
async def main(user_nonce: Optional[str]=Cookie(none)):
return {«user_nonce»: user_nonce}

Установка файлов cookie выполняется с помощью метода .set_cookie() объекта Response:

from fastapi import Response

@app.post(«/»)
async def main(response: Response):
response.set_cookie(key=»user_nonce», value=»»)
return {«msg»:»User nonce cookie cleared»}

Использование моделей Pydantic с FastAPI

Типы в Python, как правило, необязательны. Однако FastAPI больший «приверженец» использования типов, чем многие другие фреймворки Python. Для декларативной проверки отправленных данных он использует библиотеку Pydantic, так что вам не придется прописывать эту логику самостоятельно.

Вот пример того, как с помощью Pydantic можно проверить входящий JSON:

Document a FastAPI App with OpenAPI

FastAPI automatically generates an OpenAPI schema that can be accessed by your API’s users. The documentation generated by the OpenAPI schema helps users learn about your API’s features.

This guide introduces how FastAPI creates documentation from your code. It also shows you how to provide custom information related to your API, if necessary.

For example, you may want to modify an endpoint’s description or label a field as deprecated.

How to Document FastAPI Endpoints

FastAPI provides automatic documentation that follows the OpenAPI specification. In our CRUD Write Operations: Use FastAPI to Write an API guide, you write a List Programming Languages endpoint with the annotation, @app.get('/programming_languages') as seen in the following example:

File: main.py

1
2
3
4
5
6
7
8
9
@app.get('/programming_languages')
def list_programming_languages(before_year: int = 30000, after_year: int = 0):
qualifying_data = list(
filter(
lambda pl: before_year > pl.publication_year > after_year,
in_memory_datastore
)
)
return {«programming_languages» : qualifying_data }

If you run the example code above and visit localhost:8000/docs, you see the documentation displayed as shown below:

To run the code in the main.py file, navigate to the root of your FastAPI project and start the app.

hypercorn main:app —reload

  • If you click on the endpoint entry, an interface to try out the endpoint appears.
  1. An API with endpoints to GET, PUT, POST, and DELETE a resource (in this example, programming_languages), would have documentation automatically generated by FastAPI as follows:

The automatic documentation is generated from the OpenAPI Specification (OAS), an API description format for REST APIs.

In perfectly conventional REST API applications, this automatic documentation might suffice to help clients understand how to use it. But in some cases, it’s helpful to customize the documentation.

Some reasons you may want to manually update your generated API documentation are:

  • Some of the fields that a client can submit on the POST request require an explanation.
  • One of the fields is deprecated, but is still available in the endpoint’s response to avoid backward-breaking changes. Normally, a deprecated field is eventually removed from the API. The deprecated status lets clients know that they should switch to using non-deprecated fields.
Читайте также:  Как правильно давать обратную связь

How to Upload File using FastAPI?

First, as per FastAPI documentation, you need to install python-multipart—if you haven't already—as uploaded files are sent as «form data». For instance:

pip install python-multipart

The below examples use the .file attribute of the UploadFile object to get the actual Python file (i.e., SpooledTemporaryFile), which allows you to call SpooledTemporaryFile's methods, such as .read() and .close(), without having to await them.

It is important, however, to define your endpoint with def in this case—otherwise, such operations would block the server until they are completed, if the endpoint was defined with async def.

In FastAPI, a normal def endpoint is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

The SpooledTemporaryFile used by FastAPI/Starlette has the max_size attribute set to 1 MB, meaning that the data are spooled in memory until the file size exceeds 1 MB, at which point the data are written to a temporary file on disk, under the OS's temp directory.

Hence, if you uploaded a file larger than 1 MB, it wouldn't be stored in memory, and calling file.file.read() would actually read the data from disk into memory.

Thus, if the file is too large to fit into your server's RAM, you should rather read the file in chunks and process one chunk at a time, as described in «Read the File in chunks» section below.

You may also have a look at this answer, which demonstrates another approach to upload a large file in chunks, using Starlette's .stream() method and streaming-form-data package that allows parsing streaming multipart/form-data chunks, which results in considerably minimising the time required to upload files.

If you have to define your endpoint with async def—as you might need to await for some other coroutines inside your route—then you should rather use asynchronous reading and writing of the contents, as demonstrated in this answer.

Moreover, if you need to send additional data (such as JSON data) together with uploading the file(s), please have a look at this answer.

I would also suggest you have a look at this answer, which explains the difference between def and async def endpoints.

Upload Single File

app.py

from fastapi import File, UploadFile

@app.post(«/upload»)
def upload(file: UploadFile = File(…)):
try:
contents = file.file.read()
with open(file.filename, 'wb') as f:
f.write(contents)
except Exception:
return {«message»: «There was an error uploading the file»}
finally:
file.file.close()

return {«message»: f»Successfully uploaded {file.filename}»}

Read the File in chunks

As described earlier and in this answer, if the file is too big to fit into memory—for instance, if you have 8GB of RAM, you can't load a 50GB file (not to mention that the available RAM will always be less than the total amount installed on your machine, as other applications will be using some of the RAM)—you should rather load the file into memory in chunks and process the data one chunk at a time. This method, however, may take longer to complete, depending on the chunk size you choose—in the example below, the chunk size is 1024 * 1024 bytes (i.e., 1MB). You can adjust the chunk size as desired.

FastAPI, asyncio и multiprocessing

22 Фев. 2022, Python, 13058 просмотров,

Недавно товарищ поделился со мной ссылкой на статью про FastAPI и кооперативную мультипоточность.

В ней автор, во-первых, ссылается на исследование другого автора про сравнение производительности между синхронными и асинхронными Python фреймворками.

А во-вторых, приводит личный пример падения производительности приложения и как итог значительного увеличение задержки ответов от сервера.

Суть его проблемы в том, что в одном из API Endpoint выполняется CPU-intensive задача, а именно генерация списка из Pydantic-моделей, который позже конвертируется в JSON. Сериализация/десериализация огромного количества объектов недешевая процедура, а тем более для pydantic-моделей, где есть дополнительные накладные расходы. Т.е.

эта задача попадает под классический CPU-bound пример. Но нужно понимать, что любой CPU-intensive код полностью блокирует eventloop, и переключение между корутинами прекращается до полной его разблокировки. Автор статьи предлагает использовать несколько воркеров при запуске uvicorn и надеяться, что нагрузка не прилетит сразу на все.

Другой вариант это запускать тяжелые задачи в отдельном пуле потоков, но я не понимаю каким образом это решает проблему CPU-intensive задач в связи с присутствием общего GIL (Global Interpreter Lock).

В конце концов чтобы улучшить производительность приложения, он рекомендует забрать у FastAPI конвертацию Pydantic модели в json и сделать это самостоятельно, используя метод .json() у pydantic-объекта, выплюнув в итоге голый Response:

Было:

@app.get(«/clients», response_model=ClientsResponse)
def clients():
return ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(40_000)
]
)

Стало:

@app.get(«/clients»)
def clients():
return Response(
content=ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(40_000)
]
).json(),
media_type=»application/json»,
)

Производительность стала лучше, но проблема блокировки eventloop не ушла. Потому что CPU-intensive задача всё ещё выполняется в рамках того же процесса что и eventloop. Как сделать лучше? Вынести “тяжелую” задачу в отдельный процесс.

В рамках работы с asyncio можно создавать процесс и ждать выполнения задачи асинхронно. Сделать этого можно через Executor-классы из модуля concurrent.futures и метод run_in_executor у eventloop’а.

Для CPU-intensive задач подойдёт ProcessPoolExecutor.

Вот как бы это выглядело:

from concurrent.futures import ProcessPoolExecutor

def generate_client_ids(n: int) -> Response:
return Response(
content=ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(n)
]
).json(),
media_type='application/json',
)

@app.get('/clients')
async def clients():
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, generate_client_ids, 50_000)
return result

В этом случае не произойдёт ни одного таймаута или обрыва соединения, eventloop не блокируется и может переключаться между корутинами пока ожидает ответ от сабпроцесса. По-хорошему стоит вынести пул в общий доступ, чтобы не создавать его каждый раз при выполнении функции clients, но это уже другая история.

Внимательный читатель может заметить, что автор статьи использует синхронные функции (без ключевого слова async), такие функции будут запускаться через ThreadPool самим FastAPI, но сути это не меняет. Чтобы улучшить производительность и избежать таймаутов всё равно нужно запустить CPU-intensive задачу через ProcessPoolExecutor:

@app.get('/clients')
def clients(n: int) -> Response:
with ProcessPoolExecutor() as pool:
for f in as_completed([pool.submit(generate_client_ids, 40_000)]):
return f.result()

Итог

  • если вы работаете в асинхронном режиме избегайте блокирующих операций (длительные CPU-intensive задачи)
  • если избежать не получается, то выносите их в отдельные процессы и общайтесь с ними асинхронно: очереди, ProcessPoolExecutor и т.д.
  • если ваше приложение в основном состоит из CPU-intensive задач, то возможно лучше использовать обычный синхронный фреймворк (Django, Flask, Bottle, Falcon), а все “тяжелые” операции проводить через очереди типа Celery, RQ и т.д.

Building a Streaming OpenAI App in Python — CleverTap Tech Blog

  • In a world where technology is advancing at breakneck speed, OpenAI APIs stand out as a game-changing innovation. In this blog, I’ll take you on a journey to harness the power of OpenAI’s APIs and build a Rest API using FastAPI in Python
  • Find the entire code here: https://github.com/SidJain1412/StreamingFastAPI
  • FastAPI is:
  • Easy to use
  • Faster than Flask 
  • Comes with built-in documentation
  • Adheres to OpenAPI (not to be confused with OpenAI) standards 
Читайте также:  ТОП-9 стажировок для программиста в 2023 году

Recently, we at CleverTap released Scribe, and we built the API behind it using FastAPI, using many of its functionalities.

In this blog, I’ll go over the basics of FastAPI, Query validations and Streaming APIs , and we’ll build an API that generates a marketing campaign strategy for the brand given in the input!

If you’re familiar with ChatGPT (and who isn’t?), you’ll know that it generates responses on-the-fly, rather than waiting until the entire output is ready before displaying it. This clever technique keeps users engaged by allowing them to read the response as it’s being formed.

  1. As we build our own API on top of OpenAI’s APIs, we should take advantage of this feature to create a seamless and satisfying experience for end-users.
  2. Building an API with FastAPI is very easy. A few lines of code and we have an API ready, with documentation and validations

Run this app using uvicorn simple_app:app –reload and go to http://127.0.0.1:8000/docs to find documentation and test your API there.

The interactive docs page generated by FastAPI

  • Most people who have used Flask know that this is much simpler, and the improved performance is an added bonus!
  • You need an OpenAI API key for this step forward, find/create it here

In this code, we are simple getting the API Key from environment variables (can be set using export OPENAI_API_KEY= in your terminal), then creating a method using FastAPI decorators (@app.get). The method get_response_openai simply fires a query and returns the generated text (which is a marketing campaign outline in this case). Check out OpenAI Documentation for more about this method.

The input for the API is defined in prompt: str = Query(…, max_length=20). The … tells FastAPI that prompt is a required parameter. We have kept the max_length short because all we need from the user is a brand name.

FastAPI automatically handles data validation (i.e. if the input is more than 20 characters long, it will throw an appropriate error message and code)

Now that we know how to create a simple API, let’s convert it to a streaming one.

If you tested the above example, you would notice that generating the output takes 20-30 seconds. Even though streaming won’t accelerate the output, providing real-time feedback to the end user that the output is being generated keeps them engaged and interested.

First, we import from fastapi.responses import StreamingResponse, which allows us to return chunks of data from our API.

OpenAI allows us to get streaming responses just by adding a parameter.

  1. We can iterate through this response using this code:

Notice that we have replaced return with yield. Yield converts the function to a generator function, which returns an iterable object, allowing us to iterate over the response from the function as it gets generated. A good read if you want to understand further: https://www.guru99.com/python-yield-return-generator.html 

  • Now, we wrap our FastAPI method with StreamingResponse to convert it to a streaming API.
  • Specifying the media_type prevents browsers from buffering the output (ref this answer)

And that’s it! Our API can now stream responses directly from OpenAI’s API.

Refer to this commit to see the changes all together.

Let’s see the API in action. We can do this with the requests library in Python, or even with a simple cURL command.

Running the code gives us this result:

Live Streaming in Jupyter

What happens when OpenAI throws an error because too many people are using ChatGPT to solve homework problems?

A good API throws good, understandable errors, with correct error codes (not everything is a 500 error!).

  1. Let’s import from fastapi import HTTPException and define our error message in case we get a 503 error from OpenAI. 
  2. error503 = «OpenAI server is busy, try again later»
  3. In case of an error when calling OpenAI APIs, we simply raise an HTTPException using raise HTTPException(503, error503).
  4. Now if and when OpenAI models are down, our API end user will get a nice, consumable error message.
  5. We can add this error to our API documentation by adding this line to our API definition:

And that’s it, we have our streaming API with error handling ready to ship!

In Part 2 of this blog, I will go over logging, caching, monitoring and deployment!

Дедупликация (Deduplication)

  • Синонимы: Дедубликация, Устранение дубликатов
  • Разделы: Бизнес-задачи
  • Решения: Loginom Data Quality

В информационных технологиях дедупликацией называют процесс исключения из наборов данных идентичных записей, называемых дубликатами.

Дедупликация является важной и неотъемлемой частью процесса предобработки и очистки данных, поэтому соответствующие инструменты входят в состав большинства аналитических платформ. Она производится на всех этапах работы с данными, начиная от ETL и заканчивая аналитическими модулями непосредственно перед анализом.

Необходимость многоступенчатой процедуры дедупликации обусловлена тем, что интеграция данных из различных источников, операции слияния, соединения и другие виды трансформации данных могут порождать новые дубликаты. Кроме этого, дубликаты могут появляться при неудачно выбранном алгоритме выборки (например, когда одно и то же наблюдение извлекается несколько раз при использовании выборки с возвратом).

Причинами, по которым дубликаты в большинстве случаев следует удалять, являются:

  1. Большое количество дубликатов увеличивает объем хранимых данных и перегружает сеть при их передаче, особенно если объем данных значителен.
  2. Дубликаты ухудшают репрезентативность данных.
  3. Дубликаты могут искажать форму распределения данных и вызвать смещение статистических оценок.

Исключение дубликатов может производиться тремя основными способами:

  1. Удаление дублирующих записей — следует использовать, если известно, что дублирование является следствием технического сбоя, человеческого фактора или некорректной интеграции данных (например, две или более записи в клиентской базе, содержащие идентичные персональные данные). Очевидно, что в этом случае дублирующие записи (кроме оригинальной) отражают реально не существующие объекты и события, что нарушает бизнес-логику. Поэтому такие дубликаты должны быть удалены, т.е. фактически произойдет их слияние в одну запись.
  2. Агрегирование дубликатов — объединение дубликатов в одну запись, если дубликаты отражают два или более реальных, но идентичных события или объекта. Например, товар отгружался со склада по накладной, но из-за большого объема отгрузка была произведена за два рейса в одинаковых объемах в пределах одной даты. В результате были зафиксированы две отгрузки в одинаковом объеме по одной накладной, что противоречит бизнес-логике. В этом случае нужно объединить две записи в одну, а количества и суммы сложить.
  3. Обогащение данных — заключается в добавлении к набору данных дополнительных полей, которые позволяют сделать записи уникальными, даже если ранее они были дубликатами. Например, для анализа потребовалась выборка, содержащая фамилию, инициалы, год рождения и стаж работы клиента. Очевидно, что в такой выборке может оказаться значительное количество дубликатов. Однако если добавить в выборку поле с номером паспорта, то все записи станут уникальными.

Следует отметить, что существуют ситуации, когда аналитиком может быть принято решение не обрабатывать дубликаты. Это может быть сделано, если:

  • дубликатов немного и они не нарушают бизнес-логику анализа;
  • объем выборки после удаления дубликатов становится недостаточным для анализа.

Кроме этого, в некоторых случаях дубликаты специально вводятся в данные, например, для увеличения числа примеров обучающего множества, если их критически мало для построения модели, или для балансировки классов в задачах бинарной классификации.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *