This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a01f1a1 [flink] debug logging for state access a01f1a1 is described below commit a01f1a15e00e042137409222580147f5b9218d75 Author: Thomas Weise <t...@apache.org> AuthorDate: Wed Mar 13 21:41:07 2019 -0700 [flink] debug logging for state access --- .../streaming/ExecutableStageDoFnOperator.java | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 8b0685b..210da7d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -276,6 +276,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I stateBackendLock.lock(); prepareStateBackend(key, keyCoder); StateNamespace namespace = StateNamespaces.window(windowCoder, window); + if (LOG.isDebugEnabled()) { + LOG.debug( + "State get for {} {} {} {}", + pTransformId, + userStateId, + Arrays.toString(keyedStateBackend.getCurrentKey().array()), + window); + } BagState<V> bagState = stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); return bagState.read(); @@ -290,6 +298,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I stateBackendLock.lock(); prepareStateBackend(key, keyCoder); StateNamespace namespace = StateNamespaces.window(windowCoder, window); + if (LOG.isDebugEnabled()) { + LOG.debug( + "State append for {} {} {} {}", + pTransformId, + userStateId, + Arrays.toString(keyedStateBackend.getCurrentKey().array()), + window); + } BagState<V> bagState = stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); while (values.hasNext()) { @@ -306,6 +322,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I stateBackendLock.lock(); prepareStateBackend(key, keyCoder); StateNamespace namespace = StateNamespaces.window(windowCoder, window); + if (LOG.isDebugEnabled()) { + LOG.debug( + "State clear for {} {} {} {}", + pTransformId, + userStateId, + Arrays.toString(keyedStateBackend.getCurrentKey().array()), + window); + } BagState<V> bagState = stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder)); bagState.clear();