echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r616672005
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1453,14 +2062,23 @@ private HttpEntity handleRetry(
// while retry policy exists
while (BackOffUtils.next(sleeper, backoff)) {
LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
- Request request = new Request(method, endpoint);
- request.addParameters(params);
- request.setEntity(requestBody);
- response = restClient.performRequest(request);
- responseEntity = new BufferedHttpEntity(response.getEntity());
+ try {
+ Request request = new Request(method, endpoint);
+ request.addParameters(params);
+ request.setEntity(requestBody);
+ response = restClient.performRequest(request);
+ responseEntity = new BufferedHttpEntity(response.getEntity());
+ } catch (java.io.IOException ex) {
Review comment:
Before we just threw the exception and there would be retrials only on
http 429 (predicate). Now retrials are also done when receiving IOException.
Are you sure all IOException cases can be retried ? I'm not sure they are all
timeouts: a misconfigured IO will throw IOException and will be retried. It is
good to retry on Timeouts IMHO but please filter on only timeouts.
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1310,135 +1970,84 @@ public void startBundle(StartBundleContext context) {
currentBatchSizeBytes = 0;
}
- private class DocumentMetadataSerializer extends
StdSerializer<DocumentMetadata> {
-
- private DocumentMetadataSerializer() {
- super(DocumentMetadata.class);
- }
-
- @Override
- public void serialize(
- DocumentMetadata value, JsonGenerator gen, SerializerProvider
provider)
- throws IOException {
- gen.writeStartObject();
- if (value.index != null) {
- gen.writeStringField("_index", value.index);
- }
- if (value.type != null) {
- gen.writeStringField("_type", value.type);
- }
- if (value.id != null) {
- gen.writeStringField("_id", value.id);
- }
- if (value.retryOnConflict != null && (backendVersion <= 6)) {
- gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
- }
- if (value.retryOnConflict != null && backendVersion >= 7) {
- gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
- }
- gen.writeEndObject();
- }
- }
- /**
- * Extracts the components that comprise the document address from the
document using the
- * {@link FieldValueExtractFn} configured. This allows any or all of the
index, type and
- * document id to be controlled on a per document basis. Sanitization of
the index is
- * performed, automatically lower-casing the value as required by
Elasticsearch.
- *
- * @param parsedDocument the json from which the index, type and id may
be extracted
- * @return the document address as JSON or the default
- * @throws IOException if the document cannot be parsed as JSON
- */
- private String getDocumentMetadata(JsonNode parsedDocument) throws
IOException {
- DocumentMetadata metadata =
- new DocumentMetadata(
- spec.getIndexFn() != null
- ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
- : null,
- spec.getTypeFn() != null ?
spec.getTypeFn().apply(parsedDocument) : null,
- spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument)
: null,
- spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
- return OBJECT_MAPPER.writeValueAsString(metadata);
- }
-
- private static String lowerCaseOrNull(String input) {
- return input == null ? null : input.toLowerCase();
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context)
+ throws IOException, InterruptedException {
+ flushBatch();
}
@ProcessElement
- public void processElement(ProcessContext context) throws Exception {
- String document = context.element(); // use configuration and
auto-generated document IDs
- String documentMetadata = "{}";
- boolean isDelete = false;
- if (spec.getIndexFn() != null || spec.getTypeFn() != null ||
spec.getIdFn() != null) {
- // parse once and reused for efficiency
- JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
- documentMetadata = getDocumentMetadata(parsedDocument);
- if (spec.getIsDeleteFn() != null) {
- isDelete = spec.getIsDeleteFn().apply(parsedDocument);
- }
+ public void processElement(@Element @NonNull Iterable<String>
bulkApiEntities)
+ throws Exception {
+ for (String bulkApiEntity : bulkApiEntities) {
+ addAndMaybeFlush(bulkApiEntity);
}
+ }
- if (isDelete) {
- // delete request used for deleting a document.
- batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));
- } else {
- // index is an insert/upsert and update is a partial update (or
insert if not existing)
- if (spec.getUsePartialUpdate()) {
- batch.add(
- String.format(
- "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" :
true }%n",
- documentMetadata, document));
- } else {
- batch.add(String.format("{ \"index\" : %s }%n%s%n",
documentMetadata, document));
- }
- }
+ protected void addAndMaybeFlush(String bulkApiEntity)
+ throws IOException, InterruptedException {
+ batch.add(bulkApiEntity);
+ currentBatchSizeBytes +=
bulkApiEntity.getBytes(StandardCharsets.UTF_8).length;
- currentBatchSizeBytes +=
document.getBytes(StandardCharsets.UTF_8).length;
if (batch.size() >= spec.getMaxBatchSize()
|| currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
flushBatch();
}
}
- @FinishBundle
- public void finishBundle(FinishBundleContext context)
- throws IOException, InterruptedException {
- flushBatch();
- }
-
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
+
+ LOG.info(
+ "ElasticsearchIO batch size: {}, batch size bytes: {}",
+ batch.size(),
+ currentBatchSizeBytes);
+
StringBuilder bulkRequest = new StringBuilder();
for (String json : batch) {
bulkRequest.append(json);
}
+
batch.clear();
- currentBatchSizeBytes = 0;
- Response response;
- HttpEntity responseEntity;
- // Elasticsearch will default to the index/type provided here if none
are set in the
- // document meta (i.e. using ElasticsearchIO$Write#withIndexFn and
- // ElasticsearchIO$Write#withTypeFn options)
- String endPoint =
- String.format(
- "/%s/%s/_bulk",
- spec.getConnectionConfiguration().getIndex(),
- spec.getConnectionConfiguration().getType());
+ currentBatchSizeBytes = 0L;
+
+ Response response = null;
+ HttpEntity responseEntity = null;
+
+ // Elasticsearch will default to the index/type provided the {@link
+ // ConnectionConfiguration} if none are set in the document meta (i.e.
+ // using ElasticsearchIO$DocToBulk#withIndexFn and
+ // ElasticsearchIO$DocToBulk#withTypeFn options)
+ String endPoint = spec.getConnectionConfiguration().getBulkEndPoint();
+
HttpEntity requestBody =
new NStringEntity(bulkRequest.toString(),
ContentType.APPLICATION_JSON);
- Request request = new Request("POST", endPoint);
- request.addParameters(Collections.emptyMap());
- request.setEntity(requestBody);
- response = restClient.performRequest(request);
- responseEntity = new BufferedHttpEntity(response.getEntity());
+ try {
+ Request request = new Request("POST", endPoint);
+ request.addParameters(Collections.emptyMap());
+ request.setEntity(requestBody);
+ response = restClient.performRequest(request);
+ responseEntity = new BufferedHttpEntity(response.getEntity());
+ } catch (java.io.IOException ex) {
Review comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]