je-ik commented on a change in pull request #17112: URL: https://github.com/apache/beam/pull/17112#discussion_r829808452
########## File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ########## @@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) { } return input - .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow()))) + .apply(ParDo.of(new Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests()))) .apply(groupIntoBatches); } } @Override public PCollectionTuple expand(PCollection<Document> input) { ConnectionConfiguration connectionConfiguration = getConnectionConfiguration(); - checkState(connectionConfiguration != null, "withConnectionConfiguration() is required"); + PCollection<Document> docResults; + PCollection<Document> globalDocs = input.apply(Window.into(new GlobalWindows())); + if (getUseStatefulBatches()) { - return input - .apply(StatefulBatching.fromSpec(this)) - .apply( - ParDo.of(new BulkIOStatefulFn(this)) - .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES))); + docResults = + globalDocs + .apply(StatefulBatching.fromSpec(this)) + .apply(ParDo.of(new BulkIOStatefulFn(this))); } else { - return input.apply( - ParDo.of(new BulkIOBundleFn(this)) - .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES))); + docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this))); + } + + return docResults + .setWindowingStrategyInternal(input.getWindowingStrategy()) + .apply( + ParDo.of(new ResultFilteringFn()) + .withOutputTags(Write.SUCCESSFUL_WRITES, TupleTagList.of(Write.FAILED_WRITES))); + } + + private static class ResultFilteringFn extends DoFn<Document, Document> { + @ProcessElement + public void processElement(@Element Document doc, MultiOutputReceiver out) { + if (doc.getHasError()) { + out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, doc.getTimestamp()); + } else { + out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, doc.getTimestamp()); Review comment: This looks like we still have the same issue with "outputWithTimestamp being before the current element timestamp", no? This can be fixed using the deprecated `getAllowedTimestampSkew`, but essentially it demonstrates the same deficiency. Outputting with timestamp that might be (at least in theory) arbitrarily delayed after output watermark means that if user does a stateful (e.g. GBK) transform, some elements might get randomly dropped (although have been actually written). I think we could work-around modifying the ElasticsearchIO transform, so that if the statful batching is not used, we return PDone instead. That way user cannot rely on any guarantees of what has been written. Or would have to ensure nothing gets dropped (setting allowedLateness to some high value, but that looks like highly suspicious side-effect of the transform). I don't know what is correct solution here, it seems to me, that the root cause is the logical gap in stateless DoFn and how we work with watermarks there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org