This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 80eff6c [BEAM-11417] Use Cache with time eviction policy for commitCallbacks new a9a1c8e Merge pull request #13507 from [BEAM-11417] Use Cache with time eviction policy for commitCallbacks 80eff6c is described below commit 80eff6c571fd34c9a4b1024ff4201c1e2dd23dad Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Tue Dec 8 11:00:20 2020 -0800 [BEAM-11417] Use Cache with time eviction policy for commitCallbacks --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 6c127c8..6771e04 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -138,6 +138,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.EvictingQueue; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -397,8 +399,11 @@ public class StreamingDataflowWorker { new WeightedBoundedQueue<>( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); - // Map of tokens to commit callbacks. - private final ConcurrentMap<Long, Runnable> commitCallbacks = new ConcurrentHashMap<>(); + // Cache of tokens to commit callbacks. + // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are + // discarded by Dataflow service and calling commitCallback is best-effort. + private final Cache<Long, Runnable> commitCallbacks = + CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build(); // Map of user state names to system state names. // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in @@ -1200,12 +1205,13 @@ public class StreamingDataflowWorker { private void callFinalizeCallbacks(Windmill.WorkItem work) { for (Long callbackId : work.getSourceState().getFinalizeIdsList()) { - final Runnable callback = commitCallbacks.remove(callbackId); + final Runnable callback = commitCallbacks.getIfPresent(callbackId); // NOTE: It is possible the same callback id may be removed twice if // windmill restarts. // TODO: It is also possible for an earlier finalized id to be lost. // We should automatically discard all older callbacks for the same computation and key. if (callback != null) { + commitCallbacks.invalidate(callbackId); workUnitExecutor.forceExecute( () -> { try {