This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a01f1a1  [flink] debug logging for state access
a01f1a1 is described below

commit a01f1a15e00e042137409222580147f5b9218d75
Author: Thomas Weise <t...@apache.org>
AuthorDate: Wed Mar 13 21:41:07 2019 -0700

    [flink] debug logging for state access
---
 .../streaming/ExecutableStageDoFnOperator.java     | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 8b0685b..210da7d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -276,6 +276,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
             stateBackendLock.lock();
             prepareStateBackend(key, keyCoder);
             StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "State get for {} {} {} {}",
+                  pTransformId,
+                  userStateId,
+                  Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+                  window);
+            }
             BagState<V> bagState =
                 stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
             return bagState.read();
@@ -290,6 +298,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
             stateBackendLock.lock();
             prepareStateBackend(key, keyCoder);
             StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "State append for {} {} {} {}",
+                  pTransformId,
+                  userStateId,
+                  Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+                  window);
+            }
             BagState<V> bagState =
                 stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
             while (values.hasNext()) {
@@ -306,6 +322,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
             stateBackendLock.lock();
             prepareStateBackend(key, keyCoder);
             StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "State clear for {} {} {} {}",
+                  pTransformId,
+                  userStateId,
+                  Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+                  window);
+            }
             BagState<V> bagState =
                 stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
             bagState.clear();

Reply via email to