This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2033449  HIVE-22931: HoS dynamic partitioning fails with blobstore 
optimizations off (Adam Szita, reviewed by Marta Kuczora)
2033449 is described below

commit 2033449a98b962040338cf2e5d26d85c0ebb4c08
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Wed Feb 26 10:13:26 2020 +0100

    HIVE-22931: HoS dynamic partitioning fails with blobstore optimizations off 
(Adam Szita, reviewed by Marta Kuczora)
---
 .../hadoop/hive/ql/parse/spark/GenSparkUtils.java  | 13 ++++-
 .../hive/ql/exec/spark/TestSparkUtilities.java     | 63 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 5fb5fd3..92178cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -418,7 +418,18 @@ public class GenSparkUtils {
       // it must be on the same file system as the current destination
       Context baseCtx = parseCtx.getContext();
 
-      Path tmpDir = baseCtx.getExternalTmpPath(dest);
+      Path tmpDir = null;
+
+      // The dest path (output location of the final job) may be an -mr-1000X 
dir, in case all below are true:
+      // -target table location FS is not HDFS but either blob storage or 
local FS
+      // -HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR is set to false (default)
+      // -HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED is false
+      // In such case we shouldn't request an external tmp dir as it will end 
up inside the mr temp dir
+      if (baseCtx.isMRTmpFileURI(dest.toUri().getPath())) {
+        tmpDir = baseCtx.getMRTmpPath();
+      } else {
+        tmpDir = baseCtx.getExternalTmpPath(dest);
+      }
 
       // Change all the linked file sink descriptors
       if (fileSinkDesc.getLinkedFileSinkDesc() != null) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java
index f797f30..3af0006 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java
@@ -17,11 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.common.util.Ref;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -37,6 +46,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /**
@@ -87,6 +97,59 @@ public class TestSparkUtilities {
 
   }
 
+  @Test
+  public void testCreateMoveTaskDoesntCreateCascadeTempDirs() throws Exception 
{
+    FileSinkOperator fsOp = mock(FileSinkOperator.class);
+    ParseContext pctx = mock(ParseContext.class);
+    Configuration conf = new Configuration();
+    conf.set("_hive.hdfs.session.path", "hdfs:/dummypath");
+    conf.set("_hive.local.session.path", "hdfs:/dummypath");
+    Context ctx = new Context(conf);
+    String executionId = ctx.getExecutionId();
+    Context ctxSpy = spy(ctx);
+    FileSinkDesc fileSinkDesc = mock(FileSinkDesc.class);
+
+    Path mrPath = new Path("hdfs:/tmp/.staging/" + executionId + "/-mr-10001");
+    Path mrPath2 = new Path("hdfs:/tmp/.staging/" + executionId + 
"/-mr-10002");
+    Path extPath = new Path("hdfs:/tmp/.staging/" + executionId + 
"/-ext-10001");
+    Path extPath2 = new Path("hdfs:/tmp/.staging/" + executionId + 
"/-ext-10002");
+
+    final Ref<Path> expectedPathRef = new Ref<>(mrPath);
+    final Ref<Path> testPathRef = new Ref<>(extPath);
+
+    doAnswer(invocationOnMock -> {
+      return ctxSpy;
+    }).when(pctx).getContext();
+    doAnswer(invocationOnMock -> {
+      return mrPath2;
+    }).when(ctxSpy).getMRTmpPath();
+    doAnswer(invocationOnMock -> {
+      return extPath2;
+    }).when(ctxSpy).getExternalTmpPath(any(Path.class));
+    doAnswer(invocationOnMock -> {
+      return testPathRef.value;
+    }).when(fileSinkDesc).getFinalDirName();
+    doAnswer(invocationOnMock -> {
+      return null;
+    }).when(fileSinkDesc).getLinkedFileSinkDesc();
+    doAnswer(invocationOnMock -> {
+      return fileSinkDesc;
+    }).when(fsOp).getConf();
+
+    doAnswer(invocationOnMock -> {
+      assertEquals(expectedPathRef.value, invocationOnMock.getArgumentAt(0, 
Path.class));
+      return null;
+    }).when(fileSinkDesc).setDirName(any(Path.class));
+
+    testPathRef.value = mrPath;
+    expectedPathRef.value = mrPath2;
+    GenSparkUtils.createMoveTask(null, true, fsOp, pctx, null, null, null);
+
+    testPathRef.value = extPath;
+    expectedPathRef.value = extPath2;
+    GenSparkUtils.createMoveTask(null, true, fsOp, pctx, null, null, null);
+  }
+
   private SparkSession resolve(Future<SparkSession> future) {
     try {
       return future.get();

Reply via email to