This is an automated email from the ASF dual-hosted git repository. iwasakims pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 4f860f8ac21 MAPREDUCE-7369. Fixed MapReduce tasks timing out when spends more time on MultipleOutputs#close (#4247) 4f860f8ac21 is described below commit 4f860f8ac214b18a6400fc514b67a7d18ea89cb3 Author: Ashutosh Gupta <ashutosh.gu...@st.niituniversity.in> AuthorDate: Mon Jun 20 09:01:01 2022 +0100 MAPREDUCE-7369. Fixed MapReduce tasks timing out when spends more time on MultipleOutputs#close (#4247) Contributed by Ravuri Sushma sree. Co-authored-by: Ashutosh Gupta <ashu...@amazon.com> (cherry picked from commit 36c4be819ff9de471ff29c8affdf22fa4bf225bc) Conflicts: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java --- .../hadoop/mapred/TaskAttemptListenerImpl.java | 15 ++++-- .../hadoop/mapred/TestTaskAttemptListenerImpl.java | 57 ++++++++++++++++------ .../org/apache/hadoop/mapreduce/MRJobConfig.java | 7 +++ .../src/main/resources/mapred-default.xml | 7 +++ 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index c80ead4a46d..5dffd735fda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtocolSignature; @@ -50,8 +54,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; @@ -61,10 +65,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; /** * This class is responsible for talking to the task umblical. @@ -409,6 +409,11 @@ public class TaskAttemptListenerImpl extends CompositeService if (LOG.isDebugEnabled()) { LOG.debug("Ping from " + taskAttemptID.toString()); } + // Consider ping from the tasks for liveliness check + if (getConfig().getBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, + MRJobConfig.DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK)) { + taskHeartbeatHandler.progressing(yarnAttemptID); + } return feedback; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index f8b8c6ccdf1..b5a7694e4cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,23 +17,30 @@ */ package org.apache.hadoop.mapred; -import java.util.function.Supplier; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapreduce.checkpoint.EnumCounter; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.checkpoint.CheckpointID; +import org.apache.hadoop.mapreduce.checkpoint.EnumCounter; import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -48,9 +55,9 @@ import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; -import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.Dispatcher; @@ -60,17 +67,22 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; -import org.junit.After; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests the behavior of TaskAttemptListenerImpl. @@ -417,6 +429,23 @@ public class TestTaskAttemptListenerImpl { verify(hbHandler).progressing(eq(attemptId)); } + @Test + public void testPingUpdateProgress() throws IOException, InterruptedException { + configureMocks(); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, true); + listener.init(conf); + listener.start(); + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptId, wid); + verify(hbHandler).register(attemptId); + + // make sure a ping does report progress + AMFeedback feedback = listener.statusUpdate(attemptID, null); + assertTrue(feedback.getTaskFound()); + verify(hbHandler, times(1)).progressing(eq(attemptId)); + } + @Test public void testSingleStatusUpdate() throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index a90c58dd28b..15d57a6746b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -919,6 +919,13 @@ public interface MRJobConfig { MR_AM_PREFIX + "scheduler.heartbeat.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000; + /** Whether to consider ping from tasks in liveliness check. */ + String MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK = + "mapreduce.task.ping-for-liveliness-check.enabled"; + boolean DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK + = false; + + /** * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS * milliseconds before aborting. During this interval, AM will still try diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index d315a00ba4a..ac7948f92a4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -286,6 +286,13 @@ </description> </property> +<property> + <name>mapreduce.task.ping-for-liveliness-check.enabled</name> + <value>false</value> + <description>Whether to consider ping from tasks in liveliness check. + </description> +</property> + <property> <name>mapreduce.map.memory.mb</name> <value>-1</value> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org