je-ik commented on a change in pull request #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r829808452



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2246,27 +2286,43 @@ public static StatefulBatching fromSpec(BulkIO spec) {
         }
 
         return input
-            .apply(ParDo.of(new 
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(ParDo.of(new 
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequests())))
             .apply(groupIntoBatches);
       }
     }
 
     @Override
     public PCollectionTuple expand(PCollection<Document> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
 
+      PCollection<Document> docResults;
+      PCollection<Document> globalDocs = input.apply(Window.into(new 
GlobalWindows()));
+
       if (getUseStatefulBatches()) {
-        return input
-            .apply(StatefulBatching.fromSpec(this))
-            .apply(
-                ParDo.of(new BulkIOStatefulFn(this))
-                    .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+        docResults =
+            globalDocs
+                .apply(StatefulBatching.fromSpec(this))
+                .apply(ParDo.of(new BulkIOStatefulFn(this)));
       } else {
-        return input.apply(
-            ParDo.of(new BulkIOBundleFn(this))
-                .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+        docResults = globalDocs.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
+
+      return docResults
+          .setWindowingStrategyInternal(input.getWindowingStrategy())
+          .apply(
+              ParDo.of(new ResultFilteringFn())
+                  .withOutputTags(Write.SUCCESSFUL_WRITES, 
TupleTagList.of(Write.FAILED_WRITES)));
+    }
+
+    private static class ResultFilteringFn extends DoFn<Document, Document> {
+      @ProcessElement
+      public void processElement(@Element Document doc, MultiOutputReceiver 
out) {
+        if (doc.getHasError()) {
+          out.get(Write.FAILED_WRITES).outputWithTimestamp(doc, 
doc.getTimestamp());
+        } else {
+          out.get(Write.SUCCESSFUL_WRITES).outputWithTimestamp(doc, 
doc.getTimestamp());

Review comment:
       This looks like we still have the same issue with "outputWithTimestamp 
being before the current element timestamp", no? This can be fixed using the 
deprecated `getAllowedTimestampSkew`, but essentially it demonstrates the same 
deficiency.
   
   Outputting with timestamp that might be (at least in theory) arbitrarily 
delayed after output watermark means that if user does a stateful (e.g. GBK) 
transform, some elements might get randomly dropped (although have been 
actually written).
   
   I think we could work-around modifying the ElasticsearchIO transform, so 
that if the statful batching is not used, we return PDone instead. That way 
user cannot rely on any guarantees of what has been written. Or would have to 
ensure nothing gets dropped (setting allowedLateness to some high value, but 
that looks like highly suspicious side-effect of the transform).
   
   I don't know what is correct solution here, it seems to me, that the root 
cause is the logical gap in stateless DoFn and how we work with watermarks 
there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to