This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ad8af4da1 [BEAM-14253] patch SubscriptionPartitionLoader to work 
around a dataflow bug (#17523)
86ad8af4da1 is described below

commit 86ad8af4da189fdefc387d29a194bb6f6ddbf3f3
Author: dpcollins-google <40498610+dpcollins-goo...@users.noreply.github.com>
AuthorDate: Mon May 2 14:12:33 2022 -0400

    [BEAM-14253] patch SubscriptionPartitionLoader to work around a dataflow 
bug (#17523)
---
 .../io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java    | 7 ++++---
 .../java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java    | 2 --
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
index 3a21f85be99..1d04cdd3575 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
@@ -34,7 +34,6 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -73,8 +72,10 @@ class SubscriptionPartitionLoader extends PTransform<PBegin, 
PCollection<Subscri
 
     @GetInitialWatermarkEstimatorState
     public Instant getInitialWatermarkEstimatorState(@Timestamp Instant 
initial) {
-      checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE));
-      return initial;
+      // TODO: Add back when dataflow is fixed.
+      // checkArgument(initial.equals(BoundedWindow.TIMESTAMP_MIN_VALUE));
+      // return initial;
+      return Instant.EPOCH;
     }
 
     @GetInitialRestriction
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index 60d11cc712f..c02bd97241a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
 import org.joda.time.Duration;
 import org.junit.After;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -227,7 +226,6 @@ public class ReadWriteIT {
   }
 
   @Test
-  @Ignore("BEAM-14253")
   public void testReadWrite() throws Exception {
     pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
     pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);

Reply via email to