This is an automated email from the ASF dual-hosted git repository. yichi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 86ad8af4da1 [BEAM-14253] patch SubscriptionPartitionLoader to work around a dataflow bug (#17523) 86ad8af4da1 is described below commit 86ad8af4da189fdefc387d29a194bb6f6ddbf3f3 Author: dpcollins-google <40498610+dpcollins-goo...@users.noreply.github.com> AuthorDate: Mon May 2 14:12:33 2022 -0400 [BEAM-14253] patch SubscriptionPartitionLoader to work around a dataflow bug (#17523) --- .../io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java | 7 ++++--- .../java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java index 3a21f85be99..1d04cdd3575 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; @@ -73,8 +72,10 @@ class SubscriptionPartitionLoader extends PTransform<PBegin, PCollection<Subscri @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkEstimatorState(@Timestamp Instant initial) { - checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)); - return initial; + // TODO: Add back when dataflow is fixed. + // checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)); + // return initial; + return Instant.EPOCH; } @GetInitialRestriction diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java index 60d11cc712f..c02bd97241a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java @@ -61,7 +61,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.joda.time.Duration; import org.junit.After; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -227,7 +226,6 @@ public class ReadWriteIT { } @Test - @Ignore("BEAM-14253") public void testReadWrite() throws Exception { pipeline.getOptions().as(StreamingOptions.class).setStreaming(true); pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);