This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 220cbe2 SAMZA-2611: [AM-HA] heartbeat reestablish causes container's heartbeat thread to die (#1452) 220cbe2 is described below commit 220cbe29911718b5b49509d3f42e8851237f6249 Author: lakshmi-manasa-g <mgadup...@linkedin.com> AuthorDate: Mon Dec 7 07:50:25 2020 -0800 SAMZA-2611: [AM-HA] heartbeat reestablish causes container's heartbeat thread to die (#1452) Symptom: When new AM takes a long time to to start up, already running container's heartbeat thread silently dies and does not make any heartbeat requests to the new AM. Cause: AM url (yarn.am.tracking.url) key-value is removed from Coordinator stream when new AM is starting up - as this config is present in old config (aka coordinator stream) but not in the new AM generated config. This causes the running container to fetch a null when its constantly fetching value for this key and thus throws NPE. Changes: When AMHA is enabled, do not remove this config --- .../samza/container/ContainerHeartbeatMonitor.java | 30 +++++++++++++++------- .../apache/samza/util/CoordinatorStreamUtil.scala | 6 +++++ .../container/TestContainerHeartbeatMonitor.java | 24 +++++++++++++++++ 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java index 7e20f26..0eba766 100644 --- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.samza.SamzaException; import org.apache.samza.coordinator.CoordinationConstants; import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; import org.apache.samza.coordinator.stream.messages.SetConfig; @@ -92,17 +93,19 @@ public class ContainerHeartbeatMonitor { if (!response.isAlive()) { if (isApplicationMasterHighAvailabilityEnabled) { LOG.warn("Failed to establish connection with {}. Checking for new AM", coordinatorUrl); - if (checkAndEstablishConnectionWithNewAM()) { + try { + if (checkAndEstablishConnectionWithNewAM()) { + return; + } + } catch (Exception e) { + // On exception in re-establish connection with new AM, force exit. + LOG.error("Exception trying to connect with new AM", e); + forceExit("failure in establishing cconnection with new AM", 0); return; } } - scheduler.schedule(() -> { - // On timeout of container shutting down, force exit. - LOG.error("Graceful shutdown timeout expired. Force exiting."); - ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown timeout."); - System.exit(1); - }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS); - onContainerExpired.run(); + // On timeout of container shutting down, force exit. + forceExit("Graceful shutdown timeout expired. Force exiting.", SHUTDOWN_TIMOUT_MS); } }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS); started = true; @@ -135,7 +138,7 @@ public class ContainerHeartbeatMonitor { } } catch (InterruptedException e) { LOG.warn("Interrupted during sleep."); - Thread.currentThread().interrupt(); + throw new SamzaException(e); } attempt++; } @@ -147,6 +150,15 @@ public class ContainerHeartbeatMonitor { return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId); } + private void forceExit(String message, int timeout) { + scheduler.schedule(() -> { + LOG.error(message); + ThreadUtil.logThreadDump("Thread dump at heartbeat monitor: due to " + message); + System.exit(1); + }, timeout, TimeUnit.MILLISECONDS); + onContainerExpired.run(); + } + private static class HeartbeatThreadFactory implements ThreadFactory { private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-"; private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(); diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index b9469cb..0462663 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -24,6 +24,7 @@ import java.util import org.apache.samza.SamzaException import org.apache.samza.config._ +import org.apache.samza.coordinator.CoordinationConstants import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde} import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} @@ -198,6 +199,11 @@ object CoordinatorStreamUtil extends Logging { keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey)) } + if (jobConfig.getApplicationMasterHighAvailabilityEnabled) { + // if AM HA is enabled then retain AM url as running containers are fetching it from c-stream until new AM publishes new AM url. + keysToRemove = keysToRemove.filter(configKey => !(configKey.equals(CoordinationConstants.YARN_COORDINATOR_URL))) + } + info("Deleting old configs that are no longer defined: %s".format(keysToRemove)) keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) }) } diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java index ff297d1..5c0bf16 100644 --- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java +++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java @@ -174,6 +174,30 @@ public class TestContainerHeartbeatMonitor { this.containerHeartbeatMonitor.stop(); verify(this.scheduler).shutdown(); } + + @Test + public void testConnectToNewAMSerdeException() throws InterruptedException { + String containerExecutionId = "0"; + String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com"; + this.containerHeartbeatMonitor = + spy(new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL, + containerExecutionId, coordinatorStreamStore, true, 5, 10)); + CoordinatorStreamValueSerde serde = new CoordinatorStreamValueSerde(SetConfig.TYPE); + when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE); + when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl, containerExecutionId)).thenReturn(this.containerHeartbeatClient); + when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new NullPointerException("serde failed")); + + this.containerHeartbeatMonitor.start(); + // wait for the executor to finish the heartbeat check task + boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS); + assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted); + // shutdown task should have been submitted + verify(this.scheduler).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS)); + verify(this.onExpired).run(); + + this.containerHeartbeatMonitor.stop(); + verify(this.scheduler).shutdown(); + } /** * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.