nancyxu825 commented on code in PR #35409: URL: https://github.com/apache/beam/pull/35409#discussion_r2167813550
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionStartRecordAction.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.action; + +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED; + +import com.google.cloud.Timestamp; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionStartRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is part of the process for {@link + * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn..ReadChangeStreamPartitionDoFn} SDF. It is + * responsible for processing {@link PartitionStartRecord}s. The new partition start records will be + * stored in the Connector's metadata tables in order to be scheduled for future querying by the + * {@link org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.dofn.DetectNewPartitionsDoFn} SDF. + */ +public class PartitionStartRecordAction { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionStartRecordAction.class); + private final PartitionMetadataDao partitionMetadataDao; + private final ChangeStreamMetrics metrics; + + /** + * Constructs an action class for handling {@link PartitionStartRecord}s. + * + * @param partitionMetadataDao DAO class to access the Connector's metadata tables + * @param metrics metrics gathering class + */ + PartitionStartRecordAction( + PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) { + this.partitionMetadataDao = partitionMetadataDao; + this.metrics = metrics; + } + + /** + * This is the main processing function for a {@link PartitionStartRecord}. It returns an {@link + * Optional} of {@link ProcessContinuation} to indicate if the calling function should stop or + * not. If the {@link Optional} returned is empty, it means that the calling function can continue + * with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned, + * it means that this function was unable to claim the timestamp of the {@link + * PartitionStartRecord}, so the caller should stop. + * + * <p>When processing the {@link PartitionStartRecord} the following procedure is applied: + * + * <ol> + * <li>We try to claim the partition start record timestamp. If it is not possible, we stop here + * and return. + * <li>We update the watermark to the partition start record timestamp. + * <li>For each partition start record, we try to insert them in the metadata tables if they do + * not exist. + * <li>For each partition start record, we increment the corresponding metric. + * </ol> + * + * @param partition the current partition being processed + * @param record the change stream partition start record received + * @param tracker the restriction tracker of the {@link + * com.google.cloud.teleport.spanner.spannerio.changestreams.dofn.ReadChangeStreamPartitionDoFn} + * SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing + * @param watermarkEstimator the watermark estimator of the {@link + * com.google.cloud.teleport.spanner.spannerio.changestreams.dofn.ReadChangeStreamPartitionDoFn} + * SDF + * @return {@link Optional#empty()} if the caller can continue processing more records. A non + * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable + * to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with + * {@link ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. + */ + @VisibleForTesting + public Optional<ProcessContinuation> run( + PartitionMetadata partition, + PartitionStartRecord record, + RestrictionTracker<TimestampRange, Timestamp> tracker, + RestrictionInterrupter<Timestamp> interrupter, + ManualWatermarkEstimator<Instant> watermarkEstimator) { + final String token = partition.getPartitionToken(); + + LOG.debug("[{}] Processing partition start record {}", token, record); + + final Timestamp startTimestamp = record.getStartTimestamp(); + if (interrupter.tryInterrupt(startTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with partition start records at {}, rescheduling", + token, + startTimestamp); + return Optional.of(ProcessContinuation.resume()); + } + if (!tracker.tryClaim(startTimestamp)) { + LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp); + return Optional.of(ProcessContinuation.stop()); + } + watermarkEstimator.setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); + for (String startPartitionToken : record.getPartitionTokens()) { + processStartPartition(partition, record, startPartitionToken); + } + + LOG.debug("[{}] partition start action completed successfully", token); + return Optional.empty(); + } + + // Unboxing of runInTransaction result will not produce a null value, we can ignore it + @SuppressWarnings("nullness") + private void processStartPartition( + PartitionMetadata partition, PartitionStartRecord record, String startPartitionToken) { + LOG.debug("Processing start partition event {}", startPartitionToken); + + final PartitionMetadata row = + PartitionMetadata.newBuilder() + .setPartitionToken(startPartitionToken) + .setStartTimestamp(record.getStartTimestamp()) + .setEndTimestamp(partition.getEndTimestamp()) + .setHeartbeatMillis(partition.getHeartbeatMillis()) + .setState(CREATED) + .setWatermark(record.getStartTimestamp()) + .build(); + LOG.debug("Inserting start partition token {}", startPartitionToken); + final Boolean insertedRow = + partitionMetadataDao + .runInTransaction( Review Comment: This code is probably better placed in PartitionMetadataDao.java ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/PartitionEndRecordAction.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.action; + +import com.google.cloud.Timestamp; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is part of the process for {@link + * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF. It is + * responsible for processing {@link PartitionEndRecord}s. The records will be used to progress the + * watermark for the current element (partition). Note that once after this record, the partition + * will not generate any incomming change records. As the result, the QueryChangeStreamAction will + * mark the partition reading as finished by itself. + */ +public class PartitionEndRecordAction { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionEndRecordAction.class); + private final ChangeStreamMetrics metrics; + + /** + * Constructs an action class for handling {@link PartitionEndRecord}s. + * + * @param metrics metrics gathering class + */ + PartitionEndRecordAction(ChangeStreamMetrics metrics) { + this.metrics = metrics; + } + + /** + * This is the main processing function for a {@link PartitionEndRecord}. It returns an {@link + * Optional} of {@link ProcessContinuation} to indicate if the calling function should stop or + * not. If the {@link Optional} returned is empty, it means that the calling function can continue + * with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned, + * it means that this function was unable to claim the timestamp of the {@link + * PartitionEndRecord}, so the caller should stop. If an {@link Optional} of {@link + * ProcessContinuation#resume()} is returned, it means that this function should not attempt to + * claim further timestamps of the {@link PartitionEndRecord}, but instead should commit what it + * has processed so far. + * + * <p>When processing the {@link PartitionEndRecord} the following procedure is applied: + * + * <ol> + * <li>We try to claim the partition end record timestamp. If it is not possible, we stop here + * and return. + * <li>We update the necessary metrics. + * <li>We update the watermark to the partition end record timestamp. + * </ol> + */ + @VisibleForTesting + public Optional<ProcessContinuation> run( + PartitionMetadata partition, + PartitionEndRecord record, + RestrictionTracker<TimestampRange, Timestamp> tracker, + RestrictionInterrupter<Timestamp> interrupter, + ManualWatermarkEstimator<Instant> watermarkEstimator) { + + final String token = partition.getPartitionToken(); + LOG.debug("[{}] Processing partition end record {}", token, record); + + final Timestamp timestamp = record.getEndTimestamp(); + final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); + if (interrupter.tryInterrupt(timestamp)) { + LOG.debug( + "[{}] Soft deadline reached with partition end record at {}, rescheduling", + token, + timestamp); + return Optional.of(ProcessContinuation.resume()); + } + if (!tracker.tryClaim(timestamp)) { + LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp); + return Optional.of(ProcessContinuation.stop()); + } + metrics.incPartitionEndRecordCount(); + watermarkEstimator.setWatermark(timestampInstant); Review Comment: Do we not need to mark a partition as FINISHED and populate the finished timestamp? In general, I thought we did TTL on the metadata table based on the FinishedTimestamp? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org