[
https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=765665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765665
]
ASF GitHub Bot logged work on BEAM-14064:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 19:22
Start Date: 03/May/22 19:22
Worklog Time Spent: 10m
Work Description: lukecwik commented on code in PR #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r864108077
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -1997,8 +2012,12 @@ abstract static class Builder {
abstract Builder setUseStatefulBatches(boolean useStatefulBatches);
+ /** @deprecated Use {@link this#setMaxParallelRequests} instead. */
Review Comment:
```suggestion
/** @deprecated Use {@link #setMaxParallelRequests} instead. */
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
Review Comment:
```suggestion
* ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests, but may not work
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
Review Comment:
```suggestion
* runners.
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
Review Comment:
```suggestion
* runners.
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
Review Comment:
```suggestion
* ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests, but may not work
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2467,37 +2509,14 @@ private Multimap<BoundedWindow, Document> flushBatch()
createWriteReport(
responseEntity, spec.getAllowedResponseErrors(),
spec.getThrowWriteErrors());
- return mergeInputsAndResponses(inputEntries, responses);
- }
-
- private static Multimap<BoundedWindow, Document> mergeInputsAndResponses(
- List<Entry<BoundedWindow, Document>> inputs, List<Document>
responses) {
-
- checkArgument(
- inputs.size() == responses.size(), "inputs and responses must be
of same size");
-
- Multimap<BoundedWindow, Document> results = ArrayListMultimap.create();
-
- // N.B. the order of responses must always match the order of inputs
- for (int i = 0; i < inputs.size(); i++) {
- BoundedWindow outputWindow = inputs.get(i).getKey();
-
- // Contains raw input document and Bulk directive counterpart only
- Document inputDoc = inputs.get(i).getValue();
-
- // Contains stringified JSON response from Elasticsearch and error
status only
- Document outputDoc = responses.get(i);
-
- // Create a new Document object with all the input fields from
inputDoc (i.e. the raw
- // input JSON string) and all the response fields from ES bulk API
for that input document
- Document merged =
- inputDoc
- .withHasError(outputDoc.getHasError())
- .withResponseItemJson(outputDoc.getResponseItemJson());
- results.put(outputWindow, merged);
- }
-
- return results;
+ return Streams.zip(
Review Comment:
nit: it is rare that Java streams perform better and they typically hurt
readability
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
+ *
+ * @param maxParallelRequests the maximum number of parallel bulk requests
+ * @return the {@link BulkIO} with maximum parallel bulk requests per
window set
Review Comment:
```suggestion
* @return the {@link BulkIO} with maximum parallel bulk requests
```
Issue Time Tracking
-------------------
Worklog Id: (was: 765665)
Time Spent: 7h (was: 6h 50m)
> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
> Key: BEAM-14064
> URL: https://issues.apache.org/jira/browse/BEAM-14064
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Affects Versions: 2.35.0, 2.36.0, 2.37.0
> Reporter: Luke Cwik
> Assignee: Evan Galpin
> Priority: P1
> Fix For: 2.39.0
>
> Time Spent: 7h
> Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then
> trying to output them in window Y when flushing the batch. This exposed a bug
> where elements that were being buffered were being output as part of a
> different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when
> the pipeline is processing elements in window X that output with a timestamp
> is valid for window X. Note that this validation only occurs in
> *@ProcessElement* since output is associated with the current window with the
> input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing
> context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain
> it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a
> streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
> PubsubIO.readStrings().fromSubscription(subscription)
> ).apply(ParseJsons.of(OurObject::class.java))
> .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
> .withUseStatefulBatches(true)
> .withMaxParallelRequestsPerWindow(1)
> .withMaxBufferingDuration(Duration.standardSeconds(30))
> // 5 bytes **> KiB **> MiB, so 5 MiB
> .withMaxBatchSizeBytes(5L * 1024 * 1024)
> // # of docs
> .withMaxBatchSize(1000)
> .withConnectionConfiguration(
> ElasticsearchIO.ConnectionConfiguration.create(
> arrayOf(host),
> "fubar",
> "_doc"
> ).withConnectTimeout(5000)
> .withSocketTimeout(30000)
> )
> .withRetryConfiguration(
> ElasticsearchIO.RetryConfiguration.create(
> 10,
> // the duration is wall clock, against the connection and
> socket timeouts specified
> // above. I.e., 10 x 30s is gonna be more than 3 minutes,
> so if we're getting
> // 10 socket timeouts in a row, this would ignore the
> "10" part and terminate
> // after 6. The idea is that in a mixed failure mode,
> you'd get different timeouts
> // of different durations, and on average 10 x fails < 4m.
> // That said, 4m is arbitrary, so adjust as and when
> needed.
> Duration.standardMinutes(4)
> )
> )
> .withIdFn { f: JsonNode -> f["id"].asText() }
> .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
> .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") ==
> "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the
> consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker:
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the
> timestamp of the
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0
> milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the
> DoFn#getAllowedTimestampSkew() Javadoc
> for details on changing the allowed skew.
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364)
>
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks,
> and it seems like the code in the trace is largely added by the PR linked
> above. The error usually claims a skew of a few seconds, but obviously I
> can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and
> it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending
> to fix, and additionally, I’d also be happy if someone else can reproduce
> this or knows of similar reports. I feel like what we’re doing is not that
> uncommon a scenario, so I would have thought someone else would have hit this
> by now.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)