Массовая загрузка данных через Bulk API
В CRUD-операциях над документами мы добавляли документы по одному — каждый запрос отдельный HTTP-вызов. Для пары документов это нормально, но при загрузке тысяч записей такой подход становится узким местом: каждое TCP-соединение, разбор заголовков и ожидание ответа съедают время. Bulk API позволяет упаковать сколько угодно операций в один HTTP-запрос — принципиально другой уровень производительности.
Формат NDJSON: две строки на операцию
Bulk API принимает NDJSON (Newline Delimited JSON) — формат, где каждая строка это отдельный JSON-объект, а строки разделены символом \n. Никакого массива верхнего уровня и запятых между объектами: именно так Elasticsearch читает запрос потоково, не загружая всё тело в память сразу.
Для каждой операции — два объекта подряд:
{"<действие>": {"_index": "<индекс>", "_id": "<id>"}}
{"<тело документа или патч>"}Исключение — delete: он не трогает содержимое документа, поэтому вторая строка не нужна.
Пример запроса с четырьмя разными операциями сразу:
POST /_bulk
Content-Type: application/x-ndjson
{"index": {"_index": "products", "_id": "1"}}
{"name": "Клавиатура механическая", "price": 7500, "in_stock": true}
{"create": {"_index": "products", "_id": "2"}}
{"name": "Коврик для мыши", "price": 800, "in_stock": true}
{"update": {"_index": "products", "_id": "1"}}
{"doc": {"price": 7800}}
{"delete": {"_index": "products", "_id": "99"}}> Важно: тело запроса должно заканчиваться символом новой строки после последней строки. Без финального \n Elasticsearch вернёт ошибку разбора.
Если все операции идут в один индекс, его можно вынести в URL — тогда _index в строках-действиях можно опустить:
POST /products/_bulk
{"index": {"_id": "1"}}
{"name": "Клавиатура механическая", "price": 7500, "in_stock": true}
{"create": {"_id": "2"}}
{"name": "Коврик для мыши", "price": 800, "in_stock": true}flowchart TD
A["POST /_bulk"] --> B
B["index: _id=1\n◀ строка-действие"] --> C
C["name: Клавиатура, price: 7500\n◀ строка-тело"] --> D
D["update: _id=1\n◀ строка-действие"] --> E
E["doc: price→7800\n◀ строка-тело"] --> F
F["delete: _id=99\n◀ строка-действие\n(строка-тело не нужна)"]
style B fill:#dbeafe,stroke:#3b82f6
style D fill:#dbeafe,stroke:#3b82f6
style F fill:#dbeafe,stroke:#3b82f6
style C fill:#fef9c3,stroke:#eab308
style E fill:#fef9c3,stroke:#eab308Четыре действия в деталях
index — создаёт документ или полностью заменяет существующий (аналог PUT /_doc/<id>). Используйте, когда хотите «положить данные как есть», не проверяя, есть ли уже документ с таким ID.
create — создаёт документ, но возвращает 409 Conflict, если документ с таким ID уже существует (аналог PUT /_create/<id>). Полезно при идемпотентной первичной загрузке: дубликат — это баг.
update — частичное обновление: в строке-теле передаётся объект {"doc": {...}} с полями, которые нужно изменить. Остальные поля документа остаются нетронутыми. Если документа не существует и не указан upsert — вернёт document_missing_exception.
delete — удаляет документ по ID. Строка-тело не нужна. Если документа нет, errors на уровне всего запроса останется false — ES сообщает "result": "not_found" только внутри items.
Разбор ответа
Bulk API всегда возвращает HTTP 200, даже если часть операций провалилась. Чтобы понять, всё ли прошло, смотрите на тело:
{
"took": 12,
"errors": false,
"items": [
{"index": {"_index": "products", "_id": "1", "_version": 1, "result": "created", "status": 201}},
{"create": {"_index": "products", "_id": "2", "_version": 1, "result": "created", "status": 201}},
{"update": {"_index": "products", "_id": "1", "_version": 2, "result": "updated", "status": 200}},
{"delete": {"_index": "products", "_id": "99", "result": "not_found", "status": 404}}
]
}
Ключевые поля:
errors— булевый флаг верхнего уровня.false— всё прошло (или «не найдено» дляdelete).true— хотя бы одна операция упала. Именно этот флаг нужно проверять в коде — не HTTP-статус.items— массив ответов по каждой операции в том же порядке, что и запрос.status— HTTP-статус отдельной операции:201(создан),200(обновлён),404(не найден),409(конфликт версий).
Когда errors: true, найти проблемные операции удобно через jq:
curl -s -X POST "localhost:9200/products/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @data.ndjson | \
jq '.items[] | select(.index.error != null) | .index'Это отфильтрует только сбойные строки и покажет error.type и error.reason для каждой.
Типичные ошибки при заливке больших объёмов
mapper_parsing_exception — тип значения не совпал с маппингом. Например, в поле price (тип integer) пришла строка "дорого". ES не примет этот документ и вернёт ошибку в items[N].index.error, но остальные документы батча обработает нормально. Bulk API не атомарен: ошибка в одном документе не откатывает весь запрос.
version_conflict_engine_exception — вы используете оптимистичную блокировку (if_seq_no / if_primary_term), а документ уже изменили. Нужно повторить операцию с актуальными номерами.
document_missing_exception — update на несуществующий документ без upsert. Решение: добавить "upsert": {...} в тело или использовать index вместо update.
Забытый Content-Type — без заголовка Content-Type: application/x-ndjson ES не поймёт формат и вернёт ошибку разбора. Kibana Dev Tools выставляет его автоматически; в curl нужно прописать явно.
Размер батча и производительность
Нет универсального числа документов на батч — важен объём в байтах. Практическое правило: 5–15 МБ на запрос. Слишком маленькие батчи дают лишние накладные расходы на HTTP; слишком большие создают нагрузку на память ноды.
Если кластер начинает возвращать 429 Too Many Requests, очередь записи переполнена — снижайте частоту запросов или добавляйте паузы между батчами.
При массовом первоначальном импорте можно временно ускорить загрузку, отключив авто-refresh и убрав реплики:
PUT /products/_settings
{
"index": {
"refresh_interval": "-1",
"number_of_replicas": 0
}
}После окончания загрузки верните настройки ("refresh_interval": "1s", "number_of_replicas": 1) и сделайте принудительный refresh:
POST /products/_refresh