thiagotnunes commented on code in PR #35042: URL: https://github.com/apache/beam/pull/35042#discussion_r2144247256
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java: ########## @@ -40,23 +39,40 @@ public class AsyncWatermarkCache implements WatermarkCache { private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d"; private static final Object MIN_WATERMARK_KEY = new Object(); - private final LoadingCache<Object, Optional<Timestamp>> cache; + private final LoadingCache<Object, Timestamp> cache; + + // This is to cache the result of getUnfinishedMinWatermark query and filter the query in the next + // run. For the initial query, the value of this cache is min timestamp. If there is no partition + // in the metadata table, then this cache will not be updated. If the getUnfinishedMinWatermark + // query fails or times out, then this cache will not be updated. + // Note that, all the reload operations on this key are serialized due to use of the single + // threaded async reloading executor. + private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE; public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) { this.cache = CacheBuilder.newBuilder() .refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis())) .build( CacheLoader.asyncReloading( - CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())), + CacheLoader.from( + key -> { + Timestamp unfinishedMinTimes = + dao.getUnfinishedMinWatermarkFrom(lastCachedMinWatermark); + if (unfinishedMinTimes != null + && lastCachedMinWatermark.compareTo(unfinishedMinTimes) < 0) { + lastCachedMinWatermark = unfinishedMinTimes; Review Comment: lastCachedMinWatermark needs to be thread-safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org