[ 
https://issues.apache.org/jira/browse/BEAM-6190?focusedWorklogId=176295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176295
 ]

ASF GitHub Bot logged work on BEAM-6190:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Dec/18 00:49
            Start Date: 18/Dec/18 00:49
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #7240: [BEAM-6190] Add 
processing stuck message to Pantheon.
URL: https://github.com/apache/beam/pull/7240
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index b515f0c19591..2e6e07b8776b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -202,7 +202,7 @@ public ProfileScope getProfileScope() {
     private static final ImmutableSet<String> FRAMEWORK_CLASSES =
         ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
 
-    private String getLullMessage(Thread trackedThread, Duration millis) {
+    protected String getLullMessage(Thread trackedThread, Duration millis) {
       StringBuilder message = new StringBuilder();
       message.append("Processing stuck");
       if (getStepName() != null) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d6de9071d743..e375f24f9ec5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -440,11 +440,11 @@ public int getSize() {
     final Counter<Long, Long> totalProcessingMsecs;
     final Counter<Long, Long> timerProcessingMsecs;
 
-    StageInfo(String stageName, String systemName) {
+    StageInfo(String stageName, String systemName, StreamingDataflowWorker 
worker) {
       this.stageName = stageName;
       this.systemName = systemName;
       metricsContainerRegistry = 
StreamingStepMetricsContainer.createRegistry();
-      executionStateRegistry = new StreamingModeExecutionStateRegistry();
+      executionStateRegistry = new StreamingModeExecutionStateRegistry(worker);
       NameContext nameContext = NameContext.create(stageName, null, 
systemName, null);
       deltaCounters = new CounterSet();
       throttledMsecs =
@@ -1099,7 +1099,7 @@ private void process(
 
     StageInfo stageInfo =
         stageInfoMap.computeIfAbsent(
-            mapTask.getStageName(), s -> new StageInfo(s, 
mapTask.getSystemName()));
+            mapTask.getStageName(), s -> new StageInfo(s, 
mapTask.getSystemName(), this));
 
     ExecutionState executionState = null;
 
@@ -1651,10 +1651,7 @@ private String buildExceptionStackTrace(Throwable t, 
final int maxDepth) {
   // Returns true if reporting the exception is successful and the work should 
be retried.
   private boolean reportFailure(String computation, Windmill.WorkItem work, 
Throwable t) {
     // Enqueue the errors to be sent to DFE in periodic updates
-    synchronized (pendingFailuresToReport) {
-      pendingFailuresToReport.add(
-          buildExceptionStackTrace(t, 
options.getMaxStackTraceDepthToReport()));
-    }
+    addFailure(buildExceptionStackTrace(t, 
options.getMaxStackTraceDepthToReport()));
     if (windmillServiceEnabled) {
       return true;
     } else {
@@ -1670,6 +1667,16 @@ private boolean reportFailure(String computation, 
Windmill.WorkItem work, Throwa
     }
   }
 
+  /**
+   * Adds the given failure message to the queue of messages to be reported to 
DFE in periodic
+   * updates.
+   */
+  public void addFailure(String failureMessage) {
+    synchronized (pendingFailuresToReport) {
+      pendingFailuresToReport.add(failureMessage);
+    }
+  }
+
   private void reportHarnessStartup() {
     DataflowWorkerLoggingMDC.setStageName("startup");
     CounterSet restartCounter = new CounterSet();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index c902b600bd7d..dc7c50222ba9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -133,13 +133,31 @@ public StreamingModeExecutionContext(
     // 2. The reporting thread calls extractUpdate which reads the current sum 
*AND* sets it to 0.
     private final AtomicLong totalMillisInState = new AtomicLong();
 
+    // The worker that created this state.  Used to report lulls back to the 
worker.
+    private final StreamingDataflowWorker worker;
+
     public StreamingModeExecutionState(
         NameContext nameContext,
         String stateName,
         MetricsContainer metricsContainer,
-        ProfileScope profileScope) {
+        ProfileScope profileScope,
+        StreamingDataflowWorker worker) {
       // TODO: Take in the requesting step name and side input index for 
streaming.
       super(nameContext, stateName, null, null, metricsContainer, 
profileScope);
+      this.worker = worker;
+    }
+
+    /*
+     * Report the lull to the StreamingDataflowWorker that is stuck in 
addition to logging the
+     * lull.
+     */
+    @Override
+    public void reportLull(Thread trackedThread, long millis) {
+      super.reportLull(trackedThread, millis);
+      // Also report the failure to the list of pending failures to report on 
the worker thread
+      // so that the failure gets communicated to the StreamingDataflowWorker.
+      String errorMessage = getLullMessage(trackedThread, 
Duration.millis(millis));
+      worker.addFailure(errorMessage);
     }
 
     /**
@@ -172,6 +190,11 @@ public CounterUpdate extractUpdate(boolean isFinalUpdate) {
 
   /** Implementation of ExecutionStateRegistry that creates Streaming versions 
of ExecutionState. */
   public static class StreamingModeExecutionStateRegistry extends 
ExecutionStateRegistry {
+    private final StreamingDataflowWorker worker;
+
+    public StreamingModeExecutionStateRegistry(StreamingDataflowWorker worker) 
{
+      this.worker = worker;
+    }
 
     @Override
     protected DataflowOperationContext.DataflowExecutionState createState(
@@ -181,7 +204,8 @@ public CounterUpdate extractUpdate(boolean isFinalUpdate) {
         Integer inputIndex,
         MetricsContainer container,
         ProfileScope profileScope) {
-      return new StreamingModeExecutionState(nameContext, stateName, 
container, profileScope);
+      return new StreamingModeExecutionState(
+          nameContext, stateName, container, profileScope, worker);
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 9652bf5e40f9..fb5c5c031846 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -85,7 +85,7 @@
   @Mock private WindmillStateReader stateReader;
 
   private StreamingModeExecutionStateRegistry executionStateRegistry =
-      new StreamingModeExecutionStateRegistry();
+      new StreamingModeExecutionStateRegistry(null);
   private StreamingModeExecutionContext executionContext;
 
   @Before
@@ -308,7 +308,11 @@ public void testAtomicExtractUpdate() throws 
InterruptedException, ExecutionExce
 
     StreamingModeExecutionState state =
         new StreamingModeExecutionState(
-            NameContextsForTests.nameContextForTest(), "testState", null, 
NoopProfileScope.NOOP);
+            NameContextsForTests.nameContextForTest(),
+            "testState",
+            null,
+            NoopProfileScope.NOOP,
+            null);
     ExecutorService executor = Executors.newFixedThreadPool(2);
     AtomicBoolean doneWriting = new AtomicBoolean(false);
 
@@ -347,7 +351,11 @@ public void stateSamplingInStreaming() {
     // reach the reading thread.
     StreamingModeExecutionState state =
         new StreamingModeExecutionState(
-            NameContextsForTests.nameContextForTest(), "testState", null, 
NoopProfileScope.NOOP);
+            NameContextsForTests.nameContextForTest(),
+            "testState",
+            null,
+            NoopProfileScope.NOOP,
+            null);
     ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
     try {
       sampler.start();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 20a1c50f7619..93eb71e37d22 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -450,7 +450,7 @@ public void testUnboundedSplits() throws Exception {
   public void testReadUnboundedReader() throws Exception {
     CounterSet counterSet = new CounterSet();
     StreamingModeExecutionStateRegistry executionStateRegistry =
-        new StreamingModeExecutionStateRegistry();
+        new StreamingModeExecutionStateRegistry(null);
     StreamingModeExecutionContext context =
         new StreamingModeExecutionContext(
             counterSet,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 176295)
            Time Spent: 1h 40m  (was: 1.5h)
    Remaining Estimate: 22h 20m  (was: 22.5h)

> "Processing stuck" messages should be visible on Pantheon
> ---------------------------------------------------------
>
>                 Key: BEAM-6190
>                 URL: https://issues.apache.org/jira/browse/BEAM-6190
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow
>    Affects Versions: 2.8.0
>         Environment: Running on Google Cloud Dataflow
>            Reporter: Dustin Rhodes
>            Assignee: Dustin Rhodes
>            Priority: Minor
>             Fix For: Not applicable
>
>   Original Estimate: 24h
>          Time Spent: 1h 40m
>  Remaining Estimate: 22h 20m
>
> When user processing results in an exception, it is clearly visible on the 
> Pantheon landing page for a streaming Dataflow job. But when user processing 
> becomes stuck, there is no indication, even though the worker logs it. Most 
> users don't check worker logs and it is not that convenient to check for most 
> users.  Ideally a stuck worker would result in a visible error on the 
> Pantheon landing page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to