pabloem commented on code in PR #25411:
URL: https://github.com/apache/beam/pull/25411#discussion_r1105229153


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java:
##########
@@ -47,12 +57,34 @@ public GenerateInitialPartitionsAction(
    * The very first step of the pipeline when there are no partitions being 
streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, 
otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No 
partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on 
event time. This
+      // limits the ability to window on commit time of any data changes. It 
is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);

Review Comment:
   have you tested this? Your comment addresses my main question, but I am 
curious why you'd need this if you have appropriate watermark information 
coming from the record itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to