chenxuesdu commented on code in PR #36456:
URL: https://github.com/apache/beam/pull/36456#discussion_r2418083675
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -528,14 +528,23 @@ public Void updateToFinished(String partitionToken) {
}
/**
- * Update the partition watermark to the given timestamp.
+ * Update the partition watermark to the given timestamp if the given
timestamp is larger than
+ * the existing value.
*
* @param partitionToken the partition unique identifier
* @param watermark the new partition watermark
* @return the commit timestamp of the read / write transaction
*/
public Void updateWatermark(String partitionToken, Timestamp watermark) {
-
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken,
watermark));
+ final Struct partition = getPartition(partitionToken);
+ if (partition == null) {
+ LOG.debug("Partiton {} cannot find.", partitionToken);
+ return null;
+ }
+ final Timestamp currentWatermark =
partition.getTimestamp(COLUMN_WATERMARK);
+ if (watermark.compareTo(currentWatermark) > 0) {
Review Comment:
This compare happens transaction. The function updatewermark is in
transaction:
public void updateWatermark(String partitionToken, Timestamp watermark) {
runInTransaction(
transaction -> transaction.updateWatermark(partitionToken,
watermark), "updateWatermark");
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]