This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cac59bc [BEAM-9660]: Add an explicit check for integer overflow. new 05258c7 Merge pull request #11289 from spoortikundargi/patch-1 cac59bc is described below commit cac59bc585d8c7eb71fe4a893b3b337c630287d4 Author: Spoorti Kundargi <spoo...@google.com> AuthorDate: Wed Apr 1 20:12:03 2020 -0700 [BEAM-9660]: Add an explicit check for integer overflow. If `commitSize` is less than zero (due to overflow of integer serialized size), the existing code was setting it to `Integer.MAX_VALUE` and using the `estimatedCommitSize > byteLimit` check to throw an exception. However, in some cases in Dataflow Streaming Applicance, `byteLimit` is set to `Integer.MAX_VALUE` and so the check `estimatedCommitSize > byteLimit` fails to detect integer overflow. --- .../apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f2b0b27..8d2b6a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1376,7 +1376,7 @@ public class StreamingDataflowWorker { // Detect overflow of integer serialized size or if the byte limit was exceeded. windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize); - if (estimatedCommitSize > byteLimit) { + if (commitSize < 0 || commitSize > byteLimit) { KeyCommitTooLargeException e = KeyCommitTooLargeException.causedBy(computationId, byteLimit, commitRequest); reportFailure(computationId, workItem, e);