This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new 3e194cb56 TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor) 3e194cb56 is described below commit 3e194cb56fa38099e0b9c650682398e8a0ef93a0 Author: mudit-97 <32608527+mudit...@users.noreply.github.com> AuthorDate: Fri Feb 24 15:34:12 2023 +0530 TEZ-4474: Added config to fail the DAG status when recovery data is missing (#266) (Mudit Sharma reviewed by Laszlo Bodor) --- .../org/apache/tez/dag/api/TezConfiguration.java | 12 ++ .../java/org/apache/tez/dag/app/DAGAppMaster.java | 26 +++- .../org/apache/tez/dag/app/TestDAGAppMaster.java | 156 +++++++++++++++++---- 3 files changed, 162 insertions(+), 32 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index cd6d02249..6d7783624 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1828,6 +1828,18 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "dag.recovery.enabled"; public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true; + + /** + * Boolean value. When set, this enables AM to fail when DAG recovery is enabled and + * restarted app master did not find anything to recover + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA = + TEZ_AM_PREFIX + "failure.on.missing.recovery.data"; + public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT = false; + /** * Int value. Size in bytes for the IO buffer size while processing the recovery file. * Expert level setting. diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a5d7b7db9..2ef72f8c5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1916,10 +1916,7 @@ public class DAGAppMaster extends AbstractService { LOG.info("Recovering data from previous attempts" + ", currentAttemptId=" + this.appAttemptID.getAttemptId()); this.state = DAGAppMasterState.RECOVERING; - RecoveryParser recoveryParser = new RecoveryParser( - this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId()); - DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData(); - return recoveredDAGData; + return parseDAGFromRecoveryData(); } } finally { hadoopShim.clearHadoopCallerContext(); @@ -1928,6 +1925,27 @@ public class DAGAppMaster extends AbstractService { return null; } + private DAGRecoveryData parseDAGFromRecoveryData() throws IOException { + RecoveryParser recoveryParser = new RecoveryParser( + this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId()); + DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData(); + + /** + * Parsed recovery data can be NULL in scenarios where AM shutdown prematurely during the first attempt + * due to some FATAL error, if that happens recovery stream is not closed and no data is flushed on File System + * In cases like above, in next future attempts of application, recovery returns NULL instead of failing the DAG + * This config when enabled, throws an IOException for such cases, and it assumes that caller will catch these + * IOExceptions and will fail the DAG, which happens currently, JIRA: https://issues.apache.org/jira/browse/TEZ-4474 + */ + if(Objects.isNull(recoveredDAGData) && amConf.getBoolean( + TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, + TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) { + throw new IOException(String.format("Found nothing to recover in currentAttemptId=%s from recovery data dir=%s", + this.appAttemptID.getAttemptId(), this.recoveryDataDir)); + } + return recoveredDAGData; + } + @Override public void serviceStart() throws Exception { //start all the components diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index d8167dbcc..9fe8e3e72 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -14,54 +14,32 @@ package org.apache.tez.dag.app; -import org.apache.hadoop.yarn.util.MonotonicClock; -import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.records.TezVertexID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.tez.common.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.client.TezApiVersionInfo; +import org.apache.tez.common.Preconditions; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -78,13 +56,44 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.impl.DAGImpl; import org.apache.tez.dag.app.rm.TaskSchedulerManager; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestDAGAppMaster { @@ -332,6 +341,97 @@ public class TestDAGAppMaster { assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName()); } + @Test(timeout = 60000) + public void testShutdownTezAMWithMissingRecoveryAndFailureOnMissingData() throws Exception { + + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true); + conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); + conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true); + + /* + Setting very high timeout because in case when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is set, it should + not time out, it should get shutdown earlier only without the timeout flow kicking in + */ + conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1000000000); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); + + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.exists(any())).thenReturn(false); + + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true); + + dam.init(conf); + Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS"); + field.setAccessible(true); + field.set(dam, mockFs); + + dam.start(); + + ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class); + // This ensures that recovery data file system was called for getting summary files, and it will return false + verify(mockFs, times(2)).exists(captor.capture()); + + Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary")); + Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred")); + + verify(dam.mockScheduler).setShouldUnregisterFlag(); + verify(dam.mockShutdown).shutdown(); + + /* + * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is set, + * DAG will be in ERRORed state if recovery was missing for attempts > 1 + */ + assertEquals(DAGAppMasterState.ERROR, dam.getState()); + } + + @Test + public void testShutdownTezAMWithMissingRecoveryAndNoFailureOnMissingData() throws Exception { + + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true); + conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); + conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, false); + conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true); + conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2); + + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.exists(any())).thenReturn(false); + + DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true); + + dam.init(conf); + Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS"); + field.setAccessible(true); + field.set(dam, mockFs); + + dam.start(); + // Waiting for session timeout interval to kick in, which is set to 1 s + Thread.sleep(2000); + + ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class); + // This ensures that recovery data file system was called for getting summary files, and it will return false + verify(mockFs, times(2)).exists(captor.capture()); + + Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/summary")); + Assert.assertTrue(captor.getAllValues().get(0).toString().contains("/recovery/1/RecoveryFatalErrorOccurred")); + + verify(dam.mockScheduler).setShouldUnregisterFlag(); + verify(dam.mockShutdown).shutdown(); + + /* + * Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is unset, + * DAG will be in SUCCEEDED state if recovery was missing and timeout got triggered for attempts > 1 + */ + assertEquals(DAGAppMasterState.SUCCEEDED, dam.getState()); + } + private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map, int numExpected, boolean verifyPayload, String... expectedNames) throws