Массовая загрузка данных через Bulk API

В CRUD-операциях над документами мы добавляли документы по одному — каждый запрос отдельный HTTP-вызов. Для пары документов это нормально, но при загрузке тысяч записей такой подход становится узким местом: каждое TCP-соединение, разбор заголовков и ожидание ответа съедают время. Bulk API позволяет упаковать сколько угодно операций в один HTTP-запрос — принципиально другой уровень производительности.

Формат NDJSON: две строки на операцию

Bulk API принимает NDJSON (Newline Delimited JSON) — формат, где каждая строка это отдельный JSON-объект, а строки разделены символом \n. Никакого массива верхнего уровня и запятых между объектами: именно так Elasticsearch читает запрос потоково, не загружая всё тело в память сразу.

Для каждой операции — два объекта подряд:

{"<действие>": {"_index": "<индекс>", "_id": "<id>"}}
{"<тело документа или патч>"}

Исключение — delete: он не трогает содержимое документа, поэтому вторая строка не нужна.

Пример запроса с четырьмя разными операциями сразу:

POST /_bulk
Content-Type: application/x-ndjson

{"index": {"_index": "products", "_id": "1"}}
{"name": "Клавиатура механическая", "price": 7500, "in_stock": true}
{"create": {"_index": "products", "_id": "2"}}
{"name": "Коврик для мыши", "price": 800, "in_stock": true}
{"update": {"_index": "products", "_id": "1"}}
{"doc": {"price": 7800}}
{"delete": {"_index": "products", "_id": "99"}}

> Важно: тело запроса должно заканчиваться символом новой строки после последней строки. Без финального \n Elasticsearch вернёт ошибку разбора.

Если все операции идут в один индекс, его можно вынести в URL — тогда _index в строках-действиях можно опустить:

POST /products/_bulk

{"index": {"_id": "1"}}
{"name": "Клавиатура механическая", "price": 7500, "in_stock": true}
{"create": {"_id": "2"}}
{"name": "Коврик для мыши", "price": 800, "in_stock": true}
flowchart TD A["POST /_bulk"] --> B B["index: _id=1\n◀ строка-действие"] --> C C["name: Клавиатура, price: 7500\n◀ строка-тело"] --> D D["update: _id=1\n◀ строка-действие"] --> E E["doc: price→7800\n◀ строка-тело"] --> F F["delete: _id=99\n◀ строка-действие\n(строка-тело не нужна)"] style B fill:#dbeafe,stroke:#3b82f6 style D fill:#dbeafe,stroke:#3b82f6 style F fill:#dbeafe,stroke:#3b82f6 style C fill:#fef9c3,stroke:#eab308 style E fill:#fef9c3,stroke:#eab308
flowchart TD
    A["POST /_bulk"] --> B
    B["index: _id=1\n◀ строка-действие"] --> C
    C["name: Клавиатура, price: 7500\n◀ строка-тело"] --> D
    D["update: _id=1\n◀ строка-действие"] --> E
    E["doc: price→7800\n◀ строка-тело"] --> F
    F["delete: _id=99\n◀ строка-действие\n(строка-тело не нужна)"]
    style B fill:#dbeafe,stroke:#3b82f6
    style D fill:#dbeafe,stroke:#3b82f6
    style F fill:#dbeafe,stroke:#3b82f6
    style C fill:#fef9c3,stroke:#eab308
    style E fill:#fef9c3,stroke:#eab308
Структура NDJSON-тела Bulk-запроса: синим — строки-действия, жёлтым — строки-тела
Check yourself
Что произойдёт, если в конце Bulk-запроса не поставить символ новой строки после последней строки?

Четыре действия в деталях

index — создаёт документ или полностью заменяет существующий (аналог PUT /_doc/<id>). Используйте, когда хотите «положить данные как есть», не проверяя, есть ли уже документ с таким ID.

create — создаёт документ, но возвращает 409 Conflict, если документ с таким ID уже существует (аналог PUT /_create/<id>). Полезно при идемпотентной первичной загрузке: дубликат — это баг.

update — частичное обновление: в строке-теле передаётся объект {"doc": {...}} с полями, которые нужно изменить. Остальные поля документа остаются нетронутыми. Если документа не существует и не указан upsert — вернёт document_missing_exception.

delete — удаляет документ по ID. Строка-тело не нужна. Если документа нет, errors на уровне всего запроса останется false — ES сообщает "result": "not_found" только внутри items.

Check yourself
Вам нужно обновить только поле `price` до 9000 у документа с ID 5 через Bulk API. Как должна выглядеть строка-тело для действия `update`?

Разбор ответа

Bulk API всегда возвращает HTTP 200, даже если часть операций провалилась. Чтобы понять, всё ли прошло, смотрите на тело:

{
  "took": 12,
  "errors": false,
  "items": [
    {"index": {"_index": "products", "_id": "1", "_version": 1, "result": "created", "status": 201}},
    {"create": {"_index": "products", "_id": "2", "_version": 1, "result": "created", "status": 201}},
    {"update": {"_index": "products", "_id": "1", "_version": 2, "result": "updated", "status": 200}},
    {"delete": {"_index": "products", "_id": "99", "result": "not_found", "status": 404}}
  ]
}
Ответ на Bulk-запрос в Kibana Dev Tools: поля took, errors и массив items с результатом каждой операции.Source: https://opster.com/guides/elasticsearch/how-tos/elasticsearch-ingest-data/

Ключевые поля:

  • errors — булевый флаг верхнего уровня. false — всё прошло (или «не найдено» для delete). true — хотя бы одна операция упала. Именно этот флаг нужно проверять в коде — не HTTP-статус.
  • items — массив ответов по каждой операции в том же порядке, что и запрос.
  • status — HTTP-статус отдельной операции: 201 (создан), 200 (обновлён), 404 (не найден), 409 (конфликт версий).

Когда errors: true, найти проблемные операции удобно через jq:

curl -s -X POST "localhost:9200/products/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @data.ndjson | \
  jq '.items[] | select(.index.error != null) | .index'

Это отфильтрует только сбойные строки и покажет error.type и error.reason для каждой.

Типичные ошибки при заливке больших объёмов

mapper_parsing_exception — тип значения не совпал с маппингом. Например, в поле price (тип integer) пришла строка "дорого". ES не примет этот документ и вернёт ошибку в items[N].index.error, но остальные документы батча обработает нормально. Bulk API не атомарен: ошибка в одном документе не откатывает весь запрос.

version_conflict_engine_exception — вы используете оптимистичную блокировку (if_seq_no / if_primary_term), а документ уже изменили. Нужно повторить операцию с актуальными номерами.

document_missing_exceptionupdate на несуществующий документ без upsert. Решение: добавить "upsert": {...} в тело или использовать index вместо update.

Забытый Content-Type — без заголовка Content-Type: application/x-ndjson ES не поймёт формат и вернёт ошибку разбора. Kibana Dev Tools выставляет его автоматически; в curl нужно прописать явно.

Размер батча и производительность

Нет универсального числа документов на батч — важен объём в байтах. Практическое правило: 5–15 МБ на запрос. Слишком маленькие батчи дают лишние накладные расходы на HTTP; слишком большие создают нагрузку на память ноды.

Если кластер начинает возвращать 429 Too Many Requests, очередь записи переполнена — снижайте частоту запросов или добавляйте паузы между батчами.

При массовом первоначальном импорте можно временно ускорить загрузку, отключив авто-refresh и убрав реплики:

PUT /products/_settings
{
  "index": {
    "refresh_interval": "-1",
    "number_of_replicas": 0
  }
}

После окончания загрузки верните настройки ("refresh_interval": "1s", "number_of_replicas": 1) и сделайте принудительный refresh:

POST /products/_refresh
Quick recall
Что означает errors: true в верхнем уровне ответа Bulk API?
Quick recall
Какая разница между действиями index и create в Bulk API?
Quick recall
В Elasticsearch Bulk API, помимо delete, каждая операция требует ___ строк JSON.

См. также