thiagotnunes commented on code in PR #35042:
URL: https://github.com/apache/beam/pull/35042#discussion_r2144247256


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java:
##########
@@ -40,23 +39,40 @@ public class AsyncWatermarkCache implements WatermarkCache {
 
   private static final String THREAD_NAME_FORMAT = 
"watermark_loading_thread_%d";
   private static final Object MIN_WATERMARK_KEY = new Object();
-  private final LoadingCache<Object, Optional<Timestamp>> cache;
+  private final LoadingCache<Object, Timestamp> cache;
+
+  // This is to cache the result of getUnfinishedMinWatermark query and filter 
the query in the next
+  // run. For the initial query, the value of this cache is min timestamp. If 
there is no partition
+  // in the metadata table, then this cache will not be updated. If the 
getUnfinishedMinWatermark
+  // query fails or times out, then this cache will not be updated.
+  // Note that, all the reload operations on this key are serialized due to 
use of the single
+  // threaded async reloading executor.
+  private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
 
   public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
     this.cache =
         CacheBuilder.newBuilder()
             
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
             .build(
                 CacheLoader.asyncReloading(
-                    CacheLoader.from(key -> 
Optional.ofNullable(dao.getUnfinishedMinWatermark())),
+                    CacheLoader.from(
+                        key -> {
+                          Timestamp unfinishedMinTimes =
+                              
dao.getUnfinishedMinWatermarkFrom(lastCachedMinWatermark);
+                          if (unfinishedMinTimes != null
+                              && 
lastCachedMinWatermark.compareTo(unfinishedMinTimes) < 0) {
+                            lastCachedMinWatermark = unfinishedMinTimes;

Review Comment:
   lastCachedMinWatermark needs to be thread-safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to