sjvanrossum commented on code in PR #34331:
URL: https://github.com/apache/beam/pull/34331#discussion_r2054377745
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -262,82 +333,133 @@ private static final class SharedStateHolder {
* fetch backlog.
*/
private static class KafkaLatestOffsetEstimator
- implements GrowableOffsetRangeTracker.RangeEndEstimator {
-
+ implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable {
+ private static final
AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable Runnable>
+ CURRENT_REFRESH_TASK =
+ (AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, @Nullable
Runnable>)
+ AtomicReferenceFieldUpdater.newUpdater(
+ KafkaLatestOffsetEstimator.class, Runnable.class,
"currentRefreshTask");
+ private final Executor executor;
private final Consumer<byte[], byte[]> offsetConsumer;
private final TopicPartition topicPartition;
- private final Supplier<Long> memoizedBacklog;
+ private long lastRefreshEndOffset;
+ private long nextRefreshNanos;
+ private volatile @Nullable Runnable currentRefreshTask;
KafkaLatestOffsetEstimator(
- Consumer<byte[], byte[]> offsetConsumer, TopicPartition
topicPartition) {
+ final Consumer<byte[], byte[]> offsetConsumer, final TopicPartition
topicPartition) {
+ this.executor = Executors.newSingleThreadExecutor();
this.offsetConsumer = offsetConsumer;
this.topicPartition = topicPartition;
- memoizedBacklog =
- Suppliers.memoizeWithExpiration(
- () -> {
- synchronized (offsetConsumer) {
- return Preconditions.checkStateNotNull(
- offsetConsumer
- .endOffsets(Collections.singleton(topicPartition))
- .get(topicPartition),
- "No end offset found for partition %s.",
- topicPartition);
- }
- },
- 1,
- TimeUnit.SECONDS);
+ this.lastRefreshEndOffset = -1L;
+ this.nextRefreshNanos = Long.MIN_VALUE;
+ this.currentRefreshTask = null;
}
@Override
- protected void finalize() {
- try {
- Closeables.close(offsetConsumer, true);
- LOG.info("Offset Estimator consumer was closed for {}",
topicPartition);
- } catch (Exception anyException) {
- LOG.warn("Failed to close offset consumer for {}", topicPartition);
+ public long estimate() {
+ final @Nullable Runnable task = currentRefreshTask; // volatile load
(acquire)
Review Comment:
Added a long explanation, including some notes about a known atomicity bug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]