This is an automated email from the ASF dual-hosted git repository.

aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 83dc73d  HIVE-25534: Error when executing DistCp on file system not 
supporting XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha)
83dc73d is described below

commit 83dc73d9d6bbbb8482327046d7071cddd01e23c9
Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com>
AuthorDate: Fri Oct 1 14:46:27 2021 +0530

    HIVE-25534: Error when executing DistCp on file system not supporting 
XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha)
    
    * HIVE-25534: Don't preserve FileAttribute.XATTR to initialise distcp
    
    * Final Review
    
    * new
    
    * Minor correction
---
 .../apache/hadoop/hive/shims/Hadoop23Shims.java    | 47 ++++++------
 .../hadoop/hive/shims/TestHadoop23Shims.java       | 89 +++++++++++++++++++++-
 2 files changed, 109 insertions(+), 27 deletions(-)

diff --git 
a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java 
b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 36fe5a0..a611dda 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -277,6 +277,16 @@ public class Hadoop23Shims extends HadoopShimsSecure {
         equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
   }
 
+  private boolean checkFileSystemXAttrSupport(FileSystem fs) throws 
IOException {
+    try {
+      fs.getXAttrs(new Path(Path.SEPARATOR));
+      return true;
+    } catch (UnsupportedOperationException e) {
+      LOG.warn("XAttr won't be preserved since it is not supported for file 
system: " + fs.getUri());
+      return false;
+    }
+  }
+
   /**
    * Returns a shim to wrap MiniMrCluster
    */
@@ -1102,10 +1112,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
 
   private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
 
-  List<String> constructDistCpParams(List<Path> srcPaths, Path dst, 
Configuration conf) {
+  List<String> constructDistCpParams(List<Path> srcPaths, Path dst, 
Configuration conf) throws IOException {
     // -update and -delete are mandatory options for directory copy to work.
-    // -pbx is default preserve options if user doesn't pass any.
-    List<String> params = constructDistCpDefaultParams(conf);
+    List<String> params = constructDistCpDefaultParams(conf, 
dst.getFileSystem(conf),
+            srcPaths.get(0).getFileSystem(conf));
     if (!params.contains("-delete")) {
       params.add("-delete");
     }
@@ -1116,7 +1126,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     return params;
   }
 
-  private List<String> constructDistCpDefaultParams(Configuration conf) {
+  private List<String> constructDistCpDefaultParams(Configuration conf, 
FileSystem dstFs,
+                                                    FileSystem sourceFs) 
throws IOException {
     List<String> params = new ArrayList<String>();
     boolean needToAddPreserveOption = true;
     for (Map.Entry<String,String> entry : 
conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
@@ -1131,7 +1142,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       }
     }
     if (needToAddPreserveOption) {
-      params.add("-pbx");
+      params.add((checkFileSystemXAttrSupport(dstFs) && 
checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb");
     }
     if (!params.contains("-update")) {
       params.add("-update");
@@ -1150,9 +1161,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
    * @return
    */
   List<String> constructDistCpWithSnapshotParams(List<Path> srcPaths, Path 
dst, String sourceSnap, String destSnap,
-      Configuration conf, String diff) {
+      Configuration conf, String diff) throws IOException {
     // Get the default distcp params
-    List<String> params = constructDistCpDefaultParams(conf);
+    List<String> params = constructDistCpDefaultParams(conf, 
dst.getFileSystem(conf),
+            srcPaths.get(0).getFileSystem(conf));
     if (params.contains("-delete")) {
       params.remove("-delete");
     }
@@ -1192,18 +1204,11 @@ public class Hadoop23Shims extends HadoopShimsSecure {
 
   @Override
   public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) 
throws IOException {
-       DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
-               .withSyncFolder(true)
-               .withDeleteMissing(true)
-               .preserve(FileAttribute.BLOCKSIZE)
-               .preserve(FileAttribute.XATTR)
-               .build();
-
     // Creates the command-line parameters for distcp
     List<String> params = constructDistCpParams(srcPaths, dst, conf);
     DistCp distcp = null;
     try {
-      distcp = new DistCp(conf, options);
+      distcp = new DistCp(conf, null);
       distcp.getConf().setBoolean("mapred.mapper.new-api", true);
 
       // HIVE-13704 states that we should use run() instead of execute() due 
to a hadoop known issue
@@ -1230,14 +1235,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   public boolean runDistCpWithSnapshots(String oldSnapshot, String 
newSnapshot, List<Path> srcPaths, Path dst,
       boolean overwriteTarget, Configuration conf)
       throws IOException {
-    DistCpOptions options =
-        new DistCpOptions.Builder(srcPaths, 
dst).withSyncFolder(true).withUseDiff(oldSnapshot, newSnapshot)
-        
.preserve(FileAttribute.BLOCKSIZE).preserve(FileAttribute.XATTR).build();
-
     List<String> params = constructDistCpWithSnapshotParams(srcPaths, dst, 
oldSnapshot, newSnapshot, conf, "-diff");
     try {
-      conf.setBoolean("mapred.mapper.new-api", true);
-      DistCp distcp = new DistCp(conf, options);
+      DistCp distcp = new DistCp(conf, null);
+      distcp.getConf().setBoolean("mapred.mapper.new-api", true);
       int returnCode = distcp.run(params.toArray(new String[0]));
       if (returnCode == 0) {
         return true;
@@ -1253,7 +1254,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
           LOG.warn("Copy failed due to target modified. Attempting to restore 
back the target. source: {} target: {} "
               + "snapshot: {}", srcPaths, dst, oldSnapshot);
           List<String> rParams = constructDistCpWithSnapshotParams(srcPaths, 
dst, ".", oldSnapshot, conf, "-rdiff");
-          DistCp rDistcp = new DistCp(conf, options);
+          DistCp rDistcp = new DistCp(conf, null);
           returnCode = rDistcp.run(rParams.toArray(new String[0]));
           if (returnCode == 0) {
             LOG.info("Target restored to previous state.  source: {} target: 
{} snapshot: {}. Reattempting to copy.",
@@ -1273,8 +1274,6 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       }
     } catch (Exception e) {
       throw new IOException("Cannot execute DistCp process: ", e);
-    } finally {
-      conf.setBoolean("mapred.mapper.new-api", false);
     }
     return false;
   }
diff --git 
a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java 
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
index e82895a..885c2d5 100644
--- 
a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
+++ 
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -19,26 +19,43 @@
 package org.apache.hadoop.hive.shims;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
 
 
 public class TestHadoop23Shims {
 
+  private Path getMockedPath(boolean supportXAttr) throws IOException {
+    FileSystem fs = mock(FileSystem.class);
+    if (supportXAttr) {
+      when(fs.getXAttrs(any())).thenReturn(new HashMap<>());
+    } else {
+      when(fs.getXAttrs(any())).thenThrow(
+              new UnsupportedOperationException("XAttr not supported for file 
system."));
+    }
+    Path path = mock(Path.class);
+    when(path.getFileSystem(any())).thenReturn(fs);
+    return path;
+  }
+  
   @Test
-  public void testConstructDistCpParams() {
+  public void testConstructDistCpParams() throws Exception {
     Path copySrc = new Path("copySrc");
     Path copyDst = new Path("copyDst");
     Configuration conf = new Configuration();
@@ -47,7 +64,6 @@ public class TestHadoop23Shims {
     List<String> paramsDefault = 
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
 
     assertEquals(5, paramsDefault.size());
-    assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
     assertTrue("Distcp -update set by default", 
paramsDefault.contains("-update"));
     assertTrue("Distcp -delete set by default", 
paramsDefault.contains("-delete"));
     assertEquals(copySrc.toString(), paramsDefault.get(3));
@@ -94,6 +110,73 @@ public class TestHadoop23Shims {
 
   }
 
+  @Test
+  public void testXAttrNotPreservedDueToDestFS() throws Exception {
+    Configuration conf = new Configuration();
+    Path copySrc = getMockedPath(true);
+    Path copyDst = getMockedPath(false);
+
+    Hadoop23Shims shims = new Hadoop23Shims();
+    List<String> paramsDefault = 
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+    assertEquals(5, paramsDefault.size());
+    assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb"));
+    assertTrue("Distcp -update set by default", 
paramsDefault.contains("-update"));
+    assertTrue("Distcp -delete set by default", 
paramsDefault.contains("-delete"));
+    assertEquals(copySrc.toString(), paramsDefault.get(3));
+    assertEquals(copyDst.toString(), paramsDefault.get(4));
+  }
+
+  @Test
+  public void testXAttrNotPreservedDueToSrcFS() throws Exception {
+    Configuration conf = new Configuration();
+    Path copySrc = getMockedPath(false);
+    Path copyDst = getMockedPath(true);
+
+    Hadoop23Shims shims = new Hadoop23Shims();
+    List<String> paramsDefault = 
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+    assertEquals(5, paramsDefault.size());
+    assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb"));
+    assertTrue("Distcp -update set by default", 
paramsDefault.contains("-update"));
+    assertTrue("Distcp -delete set by default", 
paramsDefault.contains("-delete"));
+    assertEquals(copySrc.toString(), paramsDefault.get(3));
+    assertEquals(copyDst.toString(), paramsDefault.get(4));
+  }
+
+  @Test
+  public void testXAttrPreserved() throws Exception {
+    Configuration conf = new Configuration();
+    Path copySrc = getMockedPath(true);
+    Path copyDst = getMockedPath(true);
+    Hadoop23Shims shims = new Hadoop23Shims();
+    List<String> paramsDefault = 
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+    assertEquals(5, paramsDefault.size());
+    assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
+    assertTrue("Distcp -update set by default", 
paramsDefault.contains("-update"));
+    assertTrue("Distcp -delete set by default", 
paramsDefault.contains("-delete"));
+    assertEquals(copySrc.toString(), paramsDefault.get(3));
+    assertEquals(copyDst.toString(), paramsDefault.get(4));
+  }
+
+  @Test
+  public void testPreserveOptionsOverwritenByUser() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("distcp.options.pbx", "");
+    Path copySrc = getMockedPath(false);
+    Path copyDst = getMockedPath(false);
+    Hadoop23Shims shims = new Hadoop23Shims();
+    List<String> paramsDefault = 
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+    assertEquals(5, paramsDefault.size());
+    assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
+    assertTrue("Distcp -update set by default", 
paramsDefault.contains("-update"));
+    assertTrue("Distcp -delete set by default", 
paramsDefault.contains("-delete"));
+    assertEquals(copySrc.toString(), paramsDefault.get(3));
+    assertEquals(copyDst.toString(), paramsDefault.get(4));
+  }
+
   @Test(expected = FileNotFoundException.class)
   public void testGetFileIdForNonexistingPath() throws Exception {
     Hadoop23Shims shims = new Hadoop23Shims();

Reply via email to