This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 220cbe2  SAMZA-2611: [AM-HA] heartbeat reestablish causes container's 
heartbeat thread to die (#1452)
220cbe2 is described below

commit 220cbe29911718b5b49509d3f42e8851237f6249
Author: lakshmi-manasa-g <mgadup...@linkedin.com>
AuthorDate: Mon Dec 7 07:50:25 2020 -0800

    SAMZA-2611: [AM-HA] heartbeat reestablish causes container's heartbeat 
thread to die (#1452)
    
    Symptom: When new AM takes a long time to to start up, already running 
container's heartbeat thread silently dies and does not make any heartbeat 
requests to the new AM.
    
    Cause: AM url (yarn.am.tracking.url) key-value is removed from Coordinator 
stream when new AM is starting up - as this config is present in old config 
(aka coordinator stream) but not in the new AM generated config. This causes 
the running container to fetch a null when its constantly fetching value for 
this key and thus throws NPE.
    
    Changes: When AMHA is enabled, do not remove this config
---
 .../samza/container/ContainerHeartbeatMonitor.java | 30 +++++++++++++++-------
 .../apache/samza/util/CoordinatorStreamUtil.scala  |  6 +++++
 .../container/TestContainerHeartbeatMonitor.java   | 24 +++++++++++++++++
 3 files changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 7e20f26..0eba766 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
@@ -92,17 +93,19 @@ public class ContainerHeartbeatMonitor {
       if (!response.isAlive()) {
         if (isApplicationMasterHighAvailabilityEnabled) {
           LOG.warn("Failed to establish connection with {}. Checking for new 
AM", coordinatorUrl);
-          if (checkAndEstablishConnectionWithNewAM()) {
+          try {
+            if (checkAndEstablishConnectionWithNewAM()) {
+              return;
+            }
+          } catch (Exception e) {
+            // On exception in re-establish connection with new AM, force exit.
+            LOG.error("Exception trying to connect with new AM", e);
+            forceExit("failure in establishing cconnection with new AM", 0);
             return;
           }
         }
-        scheduler.schedule(() -> {
-          // On timeout of container shutting down, force exit.
-          LOG.error("Graceful shutdown timeout expired. Force exiting.");
-          ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown 
timeout.");
-          System.exit(1);
-        }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
-        onContainerExpired.run();
+        // On timeout of container shutting down, force exit.
+        forceExit("Graceful shutdown timeout expired. Force exiting.", 
SHUTDOWN_TIMOUT_MS);
       }
     }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
     started = true;
@@ -135,7 +138,7 @@ public class ContainerHeartbeatMonitor {
         }
       } catch (InterruptedException e) {
         LOG.warn("Interrupted during sleep.");
-        Thread.currentThread().interrupt();
+        throw new SamzaException(e);
       }
       attempt++;
     }
@@ -147,6 +150,15 @@ public class ContainerHeartbeatMonitor {
     return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
   }
 
+  private void forceExit(String message, int timeout) {
+    scheduler.schedule(() -> {
+      LOG.error(message);
+      ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + 
message);
+      System.exit(1);
+    }, timeout, TimeUnit.MILLISECONDS);
+    onContainerExpired.run();
+  }
+
   private static class HeartbeatThreadFactory implements ThreadFactory {
     private static final String PREFIX = "Samza-" + 
ContainerHeartbeatMonitor.class.getSimpleName() + "-";
     private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index b9469cb..0462663 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,6 +24,7 @@ import java.util
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
+import org.apache.samza.coordinator.CoordinationConstants
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, 
CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
@@ -198,6 +199,11 @@ object CoordinatorStreamUtil extends Logging {
         keysToRemove = keysToRemove.filter(configKey => 
!JobConfig.isAutosizingConfig(configKey))
       }
 
+      if (jobConfig.getApplicationMasterHighAvailabilityEnabled) {
+        // if AM HA is enabled then retain AM url as running containers are 
fetching it from c-stream until new AM publishes new AM url.
+        keysToRemove = keysToRemove.filter(configKey => 
!(configKey.equals(CoordinationConstants.YARN_COORDINATOR_URL)))
+      }
+
       info("Deleting old configs that are no longer defined: 
%s".format(keysToRemove))
       keysToRemove.foreach(key => { coordinatorSystemProducer.send(new 
Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
     }
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 
b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index ff297d1..5c0bf16 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -174,6 +174,30 @@ public class TestContainerHeartbeatMonitor {
     this.containerHeartbeatMonitor.stop();
     verify(this.scheduler).shutdown();
   }
+
+  @Test
+  public void testConnectToNewAMSerdeException() throws InterruptedException {
+    String containerExecutionId = "0";
+    String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";;
+    this.containerHeartbeatMonitor =
+        spy(new ContainerHeartbeatMonitor(this.onExpired, 
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
+            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    CoordinatorStreamValueSerde serde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+    
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
+    
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
 containerExecutionId)).thenReturn(this.containerHeartbeatClient);
+    
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new
 NullPointerException("serde failed"));
+
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = 
this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should have been submitted
+    verify(this.scheduler).schedule(any(Runnable.class), eq(0L), 
eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
   /**
    * Build a mock {@link ScheduledExecutorService} which will execute a 
fixed-rate task once. It will count down on
    * {@code schedulerFixedRateExecutionLatch} when the task is finished 
executing.

Reply via email to