[ 
https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=765665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765665
 ]

ASF GitHub Bot logged work on BEAM-14064:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/22 19:22
            Start Date: 03/May/22 19:22
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on code in PR #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r864108077


##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -1997,8 +2012,12 @@ abstract static class Builder {
 
       abstract Builder setUseStatefulBatches(boolean useStatefulBatches);
 
+      /** @deprecated Use {@link this#setMaxParallelRequests} instead. */

Review Comment:
   ```suggestion
         /** @deprecated Use {@link #setMaxParallelRequests} instead. */
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work

Review Comment:
   ```suggestion
        * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests, but may not work
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.

Review Comment:
   ```suggestion
        * runners.
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.

Review Comment:
   ```suggestion
        * runners.
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work

Review Comment:
   ```suggestion
        * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests, but may not work
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2467,37 +2509,14 @@ private Multimap<BoundedWindow, Document> flushBatch()
             createWriteReport(
                 responseEntity, spec.getAllowedResponseErrors(), 
spec.getThrowWriteErrors());
 
-        return mergeInputsAndResponses(inputEntries, responses);
-      }
-
-      private static Multimap<BoundedWindow, Document> mergeInputsAndResponses(
-          List<Entry<BoundedWindow, Document>> inputs, List<Document> 
responses) {
-
-        checkArgument(
-            inputs.size() == responses.size(), "inputs and responses must be 
of same size");
-
-        Multimap<BoundedWindow, Document> results = ArrayListMultimap.create();
-
-        // N.B. the order of responses must always match the order of inputs
-        for (int i = 0; i < inputs.size(); i++) {
-          BoundedWindow outputWindow = inputs.get(i).getKey();
-
-          // Contains raw input document and Bulk directive counterpart only
-          Document inputDoc = inputs.get(i).getValue();
-
-          // Contains stringified JSON response from Elasticsearch and error 
status only
-          Document outputDoc = responses.get(i);
-
-          // Create a new Document object with all the input fields from 
inputDoc (i.e. the raw
-          // input JSON string) and all the response fields from ES bulk API 
for that input document
-          Document merged =
-              inputDoc
-                  .withHasError(outputDoc.getHasError())
-                  .withResponseItemJson(outputDoc.getResponseItemJson());
-          results.put(outputWindow, merged);
-        }
-
-        return results;
+        return Streams.zip(

Review Comment:
   nit: it is rare that Java streams perform better and they typically hurt 
readability 



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
+     *
+     * @param maxParallelRequests the maximum number of parallel bulk requests
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set

Review Comment:
   ```suggestion
        * @return the {@link BulkIO} with maximum parallel bulk requests
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765665)
    Time Spent: 7h  (was: 6h 50m)

> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
>                 Key: BEAM-14064
>                 URL: https://issues.apache.org/jira/browse/BEAM-14064
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Luke Cwik
>            Assignee: Evan Galpin
>            Priority: P1
>             Fix For: 2.39.0
>
>          Time Spent: 7h
>  Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then 
> trying to output them in window Y when flushing the batch. This exposed a bug 
> where elements that were being buffered were being output as part of a 
> different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when 
> the pipeline is processing elements in window X that output with a timestamp 
> is valid for window X. Note that this validation only occurs in 
> *@ProcessElement* since output is associated with the current window with the 
> input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing 
> context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain 
> it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a 
> streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
>             PubsubIO.readStrings().fromSubscription(subscription)
>         ).apply(ParseJsons.of(OurObject::class.java))
>             .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
>             .withUseStatefulBatches(true)
>             .withMaxParallelRequestsPerWindow(1)
>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>             // 5 bytes **> KiB **> MiB, so 5 MiB
>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>             // # of docs
>             .withMaxBatchSize(1000)
>             .withConnectionConfiguration(
>                 ElasticsearchIO.ConnectionConfiguration.create(
>                     arrayOf(host),
>                     "fubar",
>                     "_doc"
>                 ).withConnectTimeout(5000)
>                     .withSocketTimeout(30000)
>             )
>             .withRetryConfiguration(
>                 ElasticsearchIO.RetryConfiguration.create(
>                     10,
>                     // the duration is wall clock, against the connection and 
> socket timeouts specified
>                     // above. I.e., 10 x 30s is gonna be more than 3 minutes, 
> so if we're getting
>                     // 10 socket timeouts in a row, this would ignore the 
> "10" part and terminate
>                     // after 6. The idea is that in a mixed failure mode, 
> you'd get different timeouts
>                     // of different durations, and on average 10 x fails < 4m.
>                     // That said, 4m is arbitrary, so adjust as and when 
> needed.
>                     Duration.standardMinutes(4)
>                 )
>             )
>             .withIdFn { f: JsonNode -> f["id"].asText() }
>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>             .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") == 
> "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the 
> consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker: 
> java.lang.IllegalArgumentException: Cannot output with timestamp 
> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the 
> timestamp of the 
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 
> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the 
> DoFn#getAllowedTimestampSkew() Javadoc 
> for details on changing the allowed skew. 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>  
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>  
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>  
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks, 
> and it seems like the code in the trace is largely added by the PR linked 
> above. The error usually claims a skew of a few seconds, but obviously I 
> can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and 
> it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending 
> to fix, and additionally, I’d also be happy if someone else can reproduce 
> this or knows of similar reports. I feel like what we’re doing is not that 
> uncommon a scenario, so I would have thought someone else would have hit this 
> by now.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to