This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 205cbcea9b0 Return unknown backlog on Kinesis if reader not started (#26953) 205cbcea9b0 is described below commit 205cbcea9b030ebd5c2dc6bd6479c5c381248aab Author: Zachary Houfek <83302349+zhou...@users.noreply.github.com> AuthorDate: Wed May 31 15:54:59 2023 -0400 Return unknown backlog on Kinesis if reader not started (#26953) --- CHANGES.md | 1 + .../java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java | 6 ++++++ .../java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 79fdcaf7f19..af9855057d6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed KinesisIO `NullPointerException` when a progress check is made before the reader is started (IO) ([#23868](https://github.com/apache/beam/issues/23868)) ## Known Issues diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java index 77184ba5ab8..1ca37d23cd8 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.io.IOException; import java.util.NoSuchElementException; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Duration; @@ -159,6 +160,11 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> { */ @Override public long getSplitBacklogBytes() { + // Safety check in case a progress check is made for the start method is called. + if (shardReadersPool == null) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp(); if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) { diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java index 7674b66f329..c063e817244 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.NoSuchElementException; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Duration; import org.joda.time.Instant; @@ -150,6 +151,11 @@ public class KinesisReaderTest { assertThat(reader.getSplitBacklogBytes()).isEqualTo(20); } + @Test + public void getSplitBacklogBytesShouldReturnUnknownIfNotStarted() { + assertThat(reader.getSplitBacklogBytes()).isEqualTo(UnboundedReader.BACKLOG_UNKNOWN); + } + @Test public void getSplitBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently() throws TransientKinesisException, IOException {