This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 2033449 HIVE-22931: HoS dynamic partitioning fails with blobstore optimizations off (Adam Szita, reviewed by Marta Kuczora) 2033449 is described below commit 2033449a98b962040338cf2e5d26d85c0ebb4c08 Author: Adam Szita <sz...@cloudera.com> AuthorDate: Wed Feb 26 10:13:26 2020 +0100 HIVE-22931: HoS dynamic partitioning fails with blobstore optimizations off (Adam Szita, reviewed by Marta Kuczora) --- .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 13 ++++- .../hive/ql/exec/spark/TestSparkUtilities.java | 63 ++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 5fb5fd3..92178cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -418,7 +418,18 @@ public class GenSparkUtils { // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); - Path tmpDir = baseCtx.getExternalTmpPath(dest); + Path tmpDir = null; + + // The dest path (output location of the final job) may be an -mr-1000X dir, in case all below are true: + // -target table location FS is not HDFS but either blob storage or local FS + // -HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR is set to false (default) + // -HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED is false + // In such case we shouldn't request an external tmp dir as it will end up inside the mr temp dir + if (baseCtx.isMRTmpFileURI(dest.toUri().getPath())) { + tmpDir = baseCtx.getMRTmpPath(); + } else { + tmpDir = baseCtx.getExternalTmpPath(dest); + } // Change all the linked file sink descriptors if (fileSinkDesc.getLinkedFileSinkDesc() != null) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java index f797f30..3af0006 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkUtilities.java @@ -17,11 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.common.util.Ref; + import org.junit.Test; import java.util.ArrayList; @@ -37,6 +46,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; /** @@ -87,6 +97,59 @@ public class TestSparkUtilities { } + @Test + public void testCreateMoveTaskDoesntCreateCascadeTempDirs() throws Exception { + FileSinkOperator fsOp = mock(FileSinkOperator.class); + ParseContext pctx = mock(ParseContext.class); + Configuration conf = new Configuration(); + conf.set("_hive.hdfs.session.path", "hdfs:/dummypath"); + conf.set("_hive.local.session.path", "hdfs:/dummypath"); + Context ctx = new Context(conf); + String executionId = ctx.getExecutionId(); + Context ctxSpy = spy(ctx); + FileSinkDesc fileSinkDesc = mock(FileSinkDesc.class); + + Path mrPath = new Path("hdfs:/tmp/.staging/" + executionId + "/-mr-10001"); + Path mrPath2 = new Path("hdfs:/tmp/.staging/" + executionId + "/-mr-10002"); + Path extPath = new Path("hdfs:/tmp/.staging/" + executionId + "/-ext-10001"); + Path extPath2 = new Path("hdfs:/tmp/.staging/" + executionId + "/-ext-10002"); + + final Ref<Path> expectedPathRef = new Ref<>(mrPath); + final Ref<Path> testPathRef = new Ref<>(extPath); + + doAnswer(invocationOnMock -> { + return ctxSpy; + }).when(pctx).getContext(); + doAnswer(invocationOnMock -> { + return mrPath2; + }).when(ctxSpy).getMRTmpPath(); + doAnswer(invocationOnMock -> { + return extPath2; + }).when(ctxSpy).getExternalTmpPath(any(Path.class)); + doAnswer(invocationOnMock -> { + return testPathRef.value; + }).when(fileSinkDesc).getFinalDirName(); + doAnswer(invocationOnMock -> { + return null; + }).when(fileSinkDesc).getLinkedFileSinkDesc(); + doAnswer(invocationOnMock -> { + return fileSinkDesc; + }).when(fsOp).getConf(); + + doAnswer(invocationOnMock -> { + assertEquals(expectedPathRef.value, invocationOnMock.getArgumentAt(0, Path.class)); + return null; + }).when(fileSinkDesc).setDirName(any(Path.class)); + + testPathRef.value = mrPath; + expectedPathRef.value = mrPath2; + GenSparkUtils.createMoveTask(null, true, fsOp, pctx, null, null, null); + + testPathRef.value = extPath; + expectedPathRef.value = extPath2; + GenSparkUtils.createMoveTask(null, true, fsOp, pctx, null, null, null); + } + private SparkSession resolve(Future<SparkSession> future) { try { return future.get();