This is an automated email from the ASF dual-hosted git repository. aasha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 83dc73d HIVE-25534: Error when executing DistCp on file system not supporting XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha) 83dc73d is described below commit 83dc73d9d6bbbb8482327046d7071cddd01e23c9 Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com> AuthorDate: Fri Oct 1 14:46:27 2021 +0530 HIVE-25534: Error when executing DistCp on file system not supporting XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha) * HIVE-25534: Don't preserve FileAttribute.XATTR to initialise distcp * Final Review * new * Minor correction --- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 47 ++++++------ .../hadoop/hive/shims/TestHadoop23Shims.java | 89 +++++++++++++++++++++- 2 files changed, 109 insertions(+), 27 deletions(-) diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 36fe5a0..a611dda 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -277,6 +277,16 @@ public class Hadoop23Shims extends HadoopShimsSecure { equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER)); } + private boolean checkFileSystemXAttrSupport(FileSystem fs) throws IOException { + try { + fs.getXAttrs(new Path(Path.SEPARATOR)); + return true; + } catch (UnsupportedOperationException e) { + LOG.warn("XAttr won't be preserved since it is not supported for file system: " + fs.getUri()); + return false; + } + } + /** * Returns a shim to wrap MiniMrCluster */ @@ -1102,10 +1112,10 @@ public class Hadoop23Shims extends HadoopShimsSecure { private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; - List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration conf) { + List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration conf) throws IOException { // -update and -delete are mandatory options for directory copy to work. - // -pbx is default preserve options if user doesn't pass any. - List<String> params = constructDistCpDefaultParams(conf); + List<String> params = constructDistCpDefaultParams(conf, dst.getFileSystem(conf), + srcPaths.get(0).getFileSystem(conf)); if (!params.contains("-delete")) { params.add("-delete"); } @@ -1116,7 +1126,8 @@ public class Hadoop23Shims extends HadoopShimsSecure { return params; } - private List<String> constructDistCpDefaultParams(Configuration conf) { + private List<String> constructDistCpDefaultParams(Configuration conf, FileSystem dstFs, + FileSystem sourceFs) throws IOException { List<String> params = new ArrayList<String>(); boolean needToAddPreserveOption = true; for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ @@ -1131,7 +1142,7 @@ public class Hadoop23Shims extends HadoopShimsSecure { } } if (needToAddPreserveOption) { - params.add("-pbx"); + params.add((checkFileSystemXAttrSupport(dstFs) && checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb"); } if (!params.contains("-update")) { params.add("-update"); @@ -1150,9 +1161,10 @@ public class Hadoop23Shims extends HadoopShimsSecure { * @return */ List<String> constructDistCpWithSnapshotParams(List<Path> srcPaths, Path dst, String sourceSnap, String destSnap, - Configuration conf, String diff) { + Configuration conf, String diff) throws IOException { // Get the default distcp params - List<String> params = constructDistCpDefaultParams(conf); + List<String> params = constructDistCpDefaultParams(conf, dst.getFileSystem(conf), + srcPaths.get(0).getFileSystem(conf)); if (params.contains("-delete")) { params.remove("-delete"); } @@ -1192,18 +1204,11 @@ public class Hadoop23Shims extends HadoopShimsSecure { @Override public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) throws IOException { - DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) - .withSyncFolder(true) - .withDeleteMissing(true) - .preserve(FileAttribute.BLOCKSIZE) - .preserve(FileAttribute.XATTR) - .build(); - // Creates the command-line parameters for distcp List<String> params = constructDistCpParams(srcPaths, dst, conf); DistCp distcp = null; try { - distcp = new DistCp(conf, options); + distcp = new DistCp(conf, null); distcp.getConf().setBoolean("mapred.mapper.new-api", true); // HIVE-13704 states that we should use run() instead of execute() due to a hadoop known issue @@ -1230,14 +1235,10 @@ public class Hadoop23Shims extends HadoopShimsSecure { public boolean runDistCpWithSnapshots(String oldSnapshot, String newSnapshot, List<Path> srcPaths, Path dst, boolean overwriteTarget, Configuration conf) throws IOException { - DistCpOptions options = - new DistCpOptions.Builder(srcPaths, dst).withSyncFolder(true).withUseDiff(oldSnapshot, newSnapshot) - .preserve(FileAttribute.BLOCKSIZE).preserve(FileAttribute.XATTR).build(); - List<String> params = constructDistCpWithSnapshotParams(srcPaths, dst, oldSnapshot, newSnapshot, conf, "-diff"); try { - conf.setBoolean("mapred.mapper.new-api", true); - DistCp distcp = new DistCp(conf, options); + DistCp distcp = new DistCp(conf, null); + distcp.getConf().setBoolean("mapred.mapper.new-api", true); int returnCode = distcp.run(params.toArray(new String[0])); if (returnCode == 0) { return true; @@ -1253,7 +1254,7 @@ public class Hadoop23Shims extends HadoopShimsSecure { LOG.warn("Copy failed due to target modified. Attempting to restore back the target. source: {} target: {} " + "snapshot: {}", srcPaths, dst, oldSnapshot); List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, dst, ".", oldSnapshot, conf, "-rdiff"); - DistCp rDistcp = new DistCp(conf, options); + DistCp rDistcp = new DistCp(conf, null); returnCode = rDistcp.run(rParams.toArray(new String[0])); if (returnCode == 0) { LOG.info("Target restored to previous state. source: {} target: {} snapshot: {}. Reattempting to copy.", @@ -1273,8 +1274,6 @@ public class Hadoop23Shims extends HadoopShimsSecure { } } catch (Exception e) { throw new IOException("Cannot execute DistCp process: ", e); - } finally { - conf.setBoolean("mapred.mapper.new-api", false); } return false; } diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java index e82895a..885c2d5 100644 --- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java +++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java @@ -19,26 +19,43 @@ package org.apache.hadoop.hive.shims; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; - import org.junit.Test; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; public class TestHadoop23Shims { + private Path getMockedPath(boolean supportXAttr) throws IOException { + FileSystem fs = mock(FileSystem.class); + if (supportXAttr) { + when(fs.getXAttrs(any())).thenReturn(new HashMap<>()); + } else { + when(fs.getXAttrs(any())).thenThrow( + new UnsupportedOperationException("XAttr not supported for file system.")); + } + Path path = mock(Path.class); + when(path.getFileSystem(any())).thenReturn(fs); + return path; + } + @Test - public void testConstructDistCpParams() { + public void testConstructDistCpParams() throws Exception { Path copySrc = new Path("copySrc"); Path copyDst = new Path("copyDst"); Configuration conf = new Configuration(); @@ -47,7 +64,6 @@ public class TestHadoop23Shims { List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); assertEquals(5, paramsDefault.size()); - assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx")); assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); assertEquals(copySrc.toString(), paramsDefault.get(3)); @@ -94,6 +110,73 @@ public class TestHadoop23Shims { } + @Test + public void testXAttrNotPreservedDueToDestFS() throws Exception { + Configuration conf = new Configuration(); + Path copySrc = getMockedPath(true); + Path copyDst = getMockedPath(false); + + Hadoop23Shims shims = new Hadoop23Shims(); + List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); + + assertEquals(5, paramsDefault.size()); + assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb")); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); + } + + @Test + public void testXAttrNotPreservedDueToSrcFS() throws Exception { + Configuration conf = new Configuration(); + Path copySrc = getMockedPath(false); + Path copyDst = getMockedPath(true); + + Hadoop23Shims shims = new Hadoop23Shims(); + List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); + + assertEquals(5, paramsDefault.size()); + assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb")); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); + } + + @Test + public void testXAttrPreserved() throws Exception { + Configuration conf = new Configuration(); + Path copySrc = getMockedPath(true); + Path copyDst = getMockedPath(true); + Hadoop23Shims shims = new Hadoop23Shims(); + List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); + + assertEquals(5, paramsDefault.size()); + assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx")); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); + } + + @Test + public void testPreserveOptionsOverwritenByUser() throws Exception { + Configuration conf = new Configuration(); + conf.set("distcp.options.pbx", ""); + Path copySrc = getMockedPath(false); + Path copyDst = getMockedPath(false); + Hadoop23Shims shims = new Hadoop23Shims(); + List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf); + + assertEquals(5, paramsDefault.size()); + assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx")); + assertTrue("Distcp -update set by default", paramsDefault.contains("-update")); + assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete")); + assertEquals(copySrc.toString(), paramsDefault.get(3)); + assertEquals(copyDst.toString(), paramsDefault.get(4)); + } + @Test(expected = FileNotFoundException.class) public void testGetFileIdForNonexistingPath() throws Exception { Hadoop23Shims shims = new Hadoop23Shims();