This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4f860f8ac21 MAPREDUCE-7369. Fixed MapReduce tasks timing out when 
spends more time on MultipleOutputs#close (#4247)
4f860f8ac21 is described below

commit 4f860f8ac214b18a6400fc514b67a7d18ea89cb3
Author: Ashutosh Gupta <ashutosh.gu...@st.niituniversity.in>
AuthorDate: Mon Jun 20 09:01:01 2022 +0100

    MAPREDUCE-7369. Fixed MapReduce tasks timing out when spends more time on 
MultipleOutputs#close (#4247)
    
    Contributed by Ravuri Sushma sree.
    
    Co-authored-by: Ashutosh Gupta <ashu...@amazon.com>
    (cherry picked from commit 36c4be819ff9de471ff29c8affdf22fa4bf225bc)
    
     Conflicts:
            
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
---
 .../hadoop/mapred/TaskAttemptListenerImpl.java     | 15 ++++--
 .../hadoop/mapred/TestTaskAttemptListenerImpl.java | 57 ++++++++++++++++------
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  7 +++
 .../src/main/resources/mapred-default.xml          |  7 +++
 4 files changed, 67 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index c80ead4a46d..5dffd735fda 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -50,8 +54,8 @@ import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
-import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import 
org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -61,10 +65,6 @@ import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * This class is responsible for talking to the task umblical.
@@ -409,6 +409,11 @@ public class TaskAttemptListenerImpl extends 
CompositeService
       if (LOG.isDebugEnabled()) {
         LOG.debug("Ping from " + taskAttemptID.toString());
       }
+      // Consider ping from the tasks for liveliness check
+      if 
(getConfig().getBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK,
+          MRJobConfig.DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK)) {
+        taskHeartbeatHandler.progressing(yarnAttemptID);
+      }
       return feedback;
     }
 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index f8b8c6ccdf1..b5a7694e4cc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -17,23 +17,30 @@
 */
 package org.apache.hadoop.mapred;
 
-import java.util.function.Supplier;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -48,9 +55,9 @@ import 
org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import 
org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import 
org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
-import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -60,17 +67,22 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the behavior of TaskAttemptListenerImpl.
@@ -417,6 +429,23 @@ public class TestTaskAttemptListenerImpl {
     verify(hbHandler).progressing(eq(attemptId));
   }
 
+  @Test
+  public void testPingUpdateProgress() throws IOException, 
InterruptedException {
+    configureMocks();
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK, 
true);
+    listener.init(conf);
+    listener.start();
+    listener.registerPendingTask(task, wid);
+    listener.registerLaunchedTask(attemptId, wid);
+    verify(hbHandler).register(attemptId);
+
+    // make sure a ping does report progress
+    AMFeedback feedback = listener.statusUpdate(attemptID, null);
+    assertTrue(feedback.getTaskFound());
+    verify(hbHandler, times(1)).progressing(eq(attemptId));
+  }
+
   @Test
   public void testSingleStatusUpdate()
       throws IOException, InterruptedException {
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index a90c58dd28b..15d57a6746b 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -919,6 +919,13 @@ public interface MRJobConfig {
     MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
   public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
 
+  /** Whether to consider ping from tasks in liveliness check. */
+  String MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK =
+      "mapreduce.task.ping-for-liveliness-check.enabled";
+  boolean DEFAULT_MR_TASK_ENABLE_PING_FOR_LIVELINESS_CHECK
+      = false;
+
+
   /**
    * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
    * milliseconds before aborting. During this interval, AM will still try
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index d315a00ba4a..ac7948f92a4 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -286,6 +286,13 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.task.ping-for-liveliness-check.enabled</name>
+  <value>false</value>
+  <description>Whether to consider ping from tasks in liveliness check.
+  </description>
+</property>
+
 <property>
   <name>mapreduce.map.memory.mb</name>
   <value>-1</value>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to