scwhittle commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2888836675
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java:
##########
@@ -102,14 +105,16 @@ private Optional<AssembledWorkItem> flushToWorkItem() {
workItemBuilder.build(),
Review Comment:
add the appliedFinalizeIds to the work item builder before building instead
of passing to create? or could remove the list and just add to the builder
where we are currently adding to the list.
Otherwise I had a concern we were passing out mutable list outside this
class. It is safe since we can that create below is just iterating and not
keeping a reference to it but seems like we might as well do it here so it's
clearer.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -17,71 +17,169 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
-import java.time.Duration;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
@Internal
final class StreamingCommitFinalizer {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
- private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY =
Duration.ofMinutes(5L);
- private final Cache<Long, Runnable> commitFinalizerCache;
+
+ /** A {@link Runnable} and expiry time pair. */
+ @AutoValue
+ public abstract static class FinalizationInfo {
+ public abstract Long getId();
+
+ public abstract Instant getExpiryTime();
+
+ public abstract Runnable getCallback();
+
+ public static FinalizationInfo create(Long id, Instant expiryTime,
Runnable callback) {
+ return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id,
expiryTime, callback);
+ }
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition queueMinChanged = lock.newCondition();
+
+ @GuardedBy("lock")
+ private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks =
new HashMap<>();
+
+ @GuardedBy("lock")
+ private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+ new PriorityQueue<>(11,
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
private final BoundedQueueExecutor finalizationExecutor;
- private StreamingCommitFinalizer(
- Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor
finalizationExecutor) {
- this.commitFinalizerCache = commitFinalizerCache;
- this.finalizationExecutor = finalizationExecutor;
+ private StreamingCommitFinalizer(BoundedQueueExecutor
finalizationCleanupExecutor) {
+ finalizationExecutor = finalizationCleanupExecutor;
+ finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0);
+ }
+
+ private void cleanupThreadBody() {
+ lock.lock();
+ try {
+ while (true) {
+ final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+ if (timeDifference.getMillis() < 0
+ || (queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)
+ && cleanUpQueue.peek() == minValue)) {
+ // The minimum element has an expiry time before now, either because
it had elapsed when
+ // we pulled it or because we awaited it, and it is still the
minimum.
+ checkState(minValue == cleanUpQueue.poll());
+ checkState(commitFinalizationCallbacks.remove(minValue.getId()) ==
minValue);
+ }
+ }
+ } catch (InterruptedException e) {
+ // We're being shutdown.
+ } finally {
+ lock.unlock();
+ }
}
static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
- return new StreamingCommitFinalizer(
-
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
- workExecutor);
+ return new StreamingCommitFinalizer(workExecutor);
}
/**
* Stores a map of user worker generated finalization ids and callbacks to
execute once a commit
* has been successfully committed to the backing state store.
*/
- void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
- commitFinalizerCache.putAll(commitCallbacks);
+ public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
+ for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
+ Long finalizeId = entry.getKey();
+ final FinalizationInfo info =
+ FinalizationInfo.create(
+ finalizeId, entry.getValue().getLeft(),
entry.getValue().getRight());
+
+ lock.lock();
+ try {
+ FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(finalizeId, info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + finalizeId
+ + " but had "
+ + existingInfo);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
/**
* When this method is called, the commits associated with the provided
finalizeIds have been
* successfully persisted in the backing state store. If the commitCallback
for the finalizationId
* is still cached it is invoked.
*/
- void finalizeCommits(Iterable<Long> finalizeIds) {
- for (long finalizeId : finalizeIds) {
- @Nullable Runnable finalizeCommit =
commitFinalizerCache.getIfPresent(finalizeId);
- // NOTE: It is possible the same callback id may be removed twice if
- // windmill restarts.
- // TODO: It is also possible for an earlier finalized id to be lost.
- // We should automatically discard all older callbacks for the same
computation and key.
- if (finalizeCommit != null) {
- commitFinalizerCache.invalidate(finalizeId);
- finalizationExecutor.forceExecute(
- () -> {
- try {
- finalizeCommit.run();
- } catch (OutOfMemoryError oom) {
- throw oom;
- } catch (Throwable t) {
- LOG.error("Source checkpoint finalization failed:", t);
- }
- },
- 0);
+ public void finalizeCommits(Iterable<Long> finalizeIds) {
+ List<Runnable> callbacksToExecute = new ArrayList<>();
Review Comment:
early return if finalizeIds is empty so we don't have to grab lock. it seems
we aren't checking that at call-site and it will very often be empty.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -17,71 +17,169 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
-import java.time.Duration;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
@Internal
final class StreamingCommitFinalizer {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
- private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY =
Duration.ofMinutes(5L);
- private final Cache<Long, Runnable> commitFinalizerCache;
+
+ /** A {@link Runnable} and expiry time pair. */
+ @AutoValue
+ public abstract static class FinalizationInfo {
+ public abstract Long getId();
+
+ public abstract Instant getExpiryTime();
+
+ public abstract Runnable getCallback();
+
+ public static FinalizationInfo create(Long id, Instant expiryTime,
Runnable callback) {
+ return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id,
expiryTime, callback);
+ }
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition queueMinChanged = lock.newCondition();
+
+ @GuardedBy("lock")
+ private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks =
new HashMap<>();
+
+ @GuardedBy("lock")
+ private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+ new PriorityQueue<>(11,
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
private final BoundedQueueExecutor finalizationExecutor;
- private StreamingCommitFinalizer(
- Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor
finalizationExecutor) {
- this.commitFinalizerCache = commitFinalizerCache;
- this.finalizationExecutor = finalizationExecutor;
+ private StreamingCommitFinalizer(BoundedQueueExecutor
finalizationCleanupExecutor) {
+ finalizationExecutor = finalizationCleanupExecutor;
+ finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0);
+ }
+
+ private void cleanupThreadBody() {
+ lock.lock();
+ try {
+ while (true) {
+ final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+ if (timeDifference.getMillis() < 0
+ || (queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)
+ && cleanUpQueue.peek() == minValue)) {
+ // The minimum element has an expiry time before now, either because
it had elapsed when
+ // we pulled it or because we awaited it, and it is still the
minimum.
+ checkState(minValue == cleanUpQueue.poll());
+ checkState(commitFinalizationCallbacks.remove(minValue.getId()) ==
minValue);
+ }
+ }
+ } catch (InterruptedException e) {
+ // We're being shutdown.
+ } finally {
+ lock.unlock();
+ }
}
static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
- return new StreamingCommitFinalizer(
-
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
- workExecutor);
+ return new StreamingCommitFinalizer(workExecutor);
}
/**
* Stores a map of user worker generated finalization ids and callbacks to
execute once a commit
* has been successfully committed to the backing state store.
*/
- void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
- commitFinalizerCache.putAll(commitCallbacks);
+ public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
+ for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
+ Long finalizeId = entry.getKey();
+ final FinalizationInfo info =
+ FinalizationInfo.create(
+ finalizeId, entry.getValue().getLeft(),
entry.getValue().getRight());
+
+ lock.lock();
+ try {
+ FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(finalizeId, info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + finalizeId
+ + " but had "
+ + existingInfo);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
/**
* When this method is called, the commits associated with the provided
finalizeIds have been
* successfully persisted in the backing state store. If the commitCallback
for the finalizationId
* is still cached it is invoked.
*/
- void finalizeCommits(Iterable<Long> finalizeIds) {
- for (long finalizeId : finalizeIds) {
- @Nullable Runnable finalizeCommit =
commitFinalizerCache.getIfPresent(finalizeId);
- // NOTE: It is possible the same callback id may be removed twice if
- // windmill restarts.
- // TODO: It is also possible for an earlier finalized id to be lost.
- // We should automatically discard all older callbacks for the same
computation and key.
- if (finalizeCommit != null) {
- commitFinalizerCache.invalidate(finalizeId);
- finalizationExecutor.forceExecute(
- () -> {
- try {
- finalizeCommit.run();
- } catch (OutOfMemoryError oom) {
- throw oom;
- } catch (Throwable t) {
- LOG.error("Source checkpoint finalization failed:", t);
- }
- },
- 0);
+ public void finalizeCommits(Iterable<Long> finalizeIds) {
+ List<Runnable> callbacksToExecute = new ArrayList<>();
+ lock.lock();
+ try {
+ for (long finalizeId : finalizeIds) {
+ @Nullable FinalizationInfo info =
commitFinalizationCallbacks.remove(finalizeId);
+ if (info != null) {
+ checkState(cleanUpQueue.remove(info));
+ callbacksToExecute.add(info.getCallback());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ for (Runnable callback : callbacksToExecute) {
+ try {
+ finalizationExecutor.execute(callback, 0);
Review Comment:
why the 0? can just execute(runnable) method be called?
##########
runners/google-cloud-dataflow-java/build.gradle:
##########
Review Comment:
think you need to add exclusion here for batch runner
(could modify the trigger file for this one as well to validate)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -17,71 +17,169 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
-import java.time.Duration;
+import static com.google.common.base.Preconditions.checkState;
Review Comment:
think this needs to be the vendored one?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ verify(callback, never()).run();
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ CountDownLatch callbackExecuted = new CountDownLatch(1);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callbackExecuted.countDown())));
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS));
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testMultipleCommits() throws Exception {
+ CountDownLatch callback1Executed = new CountDownLatch(1);
+ CountDownLatch callback2Executed = new CountDownLatch(1);
+ CountDownLatch callback3Executed = new CountDownLatch(1);
+
+ Instant expiryTime = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown()))
+ .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown()))
+ .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown()))
+ .build());
+ // Finalize commits one at a time (in different order from added).
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ assertTrue(callback2Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(3L));
+ assertTrue(callback3Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testIgnoresUnknownIds() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ while (executor.elementsOutstanding() > 1) {
Review Comment:
can you just assert it is 1 here instead of a loop?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -1043,9 +1072,41 @@ public TimerInternals timerInternals() {
return checkNotNull(systemTimerInternals);
}
+ @Override
+ public BundleFinalizer bundleFinalizer() {
+ return bundleFinalizer;
+ }
+
public TimerInternals userTimerInternals() {
ensureStateful("Tried to access user timers");
return checkNotNull(userTimerInternals);
}
+
+ public List<Pair<Instant, BundleFinalizer.Callback>>
getBundleFinalizerCallbacks() {
Review Comment:
this is returning mutable internal state, it seems we just call this before
clearing though. How about making this a flushBundleFinalizerCallbacks method
that combines both? Could use an ImmutableList.Builder and reset it only if
non-empty
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ verify(callback, never()).run();
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ CountDownLatch callbackExecuted = new CountDownLatch(1);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callbackExecuted.countDown())));
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS));
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testMultipleCommits() throws Exception {
+ CountDownLatch callback1Executed = new CountDownLatch(1);
+ CountDownLatch callback2Executed = new CountDownLatch(1);
+ CountDownLatch callback3Executed = new CountDownLatch(1);
+
+ Instant expiryTime = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown()))
+ .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown()))
+ .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown()))
+ .build());
+ // Finalize commits one at a time (in different order from added).
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ assertTrue(callback2Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(3L));
+ assertTrue(callback3Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testIgnoresUnknownIds() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ while (executor.elementsOutstanding() > 1) {
+ Thread.sleep(200);
+ }
+ verify(callback, never()).run();
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCleanupOnExpiration() throws Exception {
+ CountDownLatch callback1Executed = new CountDownLatch(1);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callback1Executed.countDown())));
+ assertEquals(1, finalizer.cleanupQueueSize());
+
+ Runnable callback2 = mock(Runnable.class);
+ Runnable callback3 = mock(Runnable.class);
+ Instant shortTimeout = Instant.now().plus(Duration.millis(100));
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(2L, Pair.of(shortTimeout, callback2))
+ .put(3L, Pair.of(shortTimeout, callback3))
+ .build());
+
+ while (finalizer.cleanupQueueSize() > 1) {
+ // Wait until the two 100ms timeouts expire.
+ Thread.sleep(200);
+ }
+ // We can call finalize even though these were already cleaned up.
+ finalizer.finalizeCommits(ImmutableList.of(2L, 3L));
+ while (executor.elementsOutstanding() > 1) {
Review Comment:
could do this before the finalizeCommits above, and assert it is 1 after
calling it instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]