Repository: hive Updated Branches: refs/heads/master 133d3c473 -> e33126281
HIVE-16395: ConcurrentModificationException on config object in HoS (Andrew Sherman via Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e3312628 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e3312628 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e3312628 Branch: refs/heads/master Commit: e331262813027ca2a1aae7fedcd1c8863ed6b751 Parents: 133d3c4 Author: Andrew Sherman <asher...@cloudera.com> Authored: Sun Oct 15 17:16:35 2017 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Sun Oct 15 17:16:35 2017 -0700 ---------------------------------------------------------------------- .../ql/exec/spark/HiveSparkClientFactory.java | 8 ++- .../ql/exec/spark/session/SparkSessionImpl.java | 6 +++ .../session/TestSparkSessionManagerImpl.java | 51 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 194585e..597fcab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -26,10 +26,10 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.compress.utils.CharsetNames; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; import org.slf4j.Logger; @@ -60,6 +60,8 @@ public class HiveSparkClientFactory { private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false"; private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; + @VisibleForTesting + public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf"; public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { Map<String, String> sparkConf = initiateSparkConf(hiveconf); @@ -222,6 +224,10 @@ public class HiveSparkClientFactory { sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } + // Force Spark configs to be cloned by default + sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true"); + + // Set the credential provider passwords if found, if there is job specific password // the credential provider location is set directly in the execute method of LocalSparkClient // and submit method of RemoteHiveSparkClient when the job config is created http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 54d2cec..8d79dd9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.spark.session; import java.io.IOException; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -174,4 +175,9 @@ public class SparkSessionImpl implements SparkSession { public static String makeSessionId() { return UUID.randomUUID().toString(); } + + @VisibleForTesting + HiveSparkClient getHiveSparkClient() { + return hiveSparkClient; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java index 489383b..47d2437 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java @@ -27,7 +27,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient; +import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.spark.SparkConf; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +103,51 @@ public class TestSparkSessionManagerImpl { sessionManagerHS2.shutdown(); } + /** + * Test HIVE-16395 - by default we force cloning of Configurations for Spark jobs + */ + @Test + public void testForceConfCloning() throws Exception { + HiveConf conf = new HiveConf(); + conf.set("spark.master", "local"); + String sparkCloneConfiguration = HiveSparkClientFactory.SPARK_CLONE_CONFIGURATION; + + // Clear the value of sparkCloneConfiguration + conf.unset(sparkCloneConfiguration); + assertNull( "Could not clear " + sparkCloneConfiguration + " in HiveConf", + conf.get(sparkCloneConfiguration)); + + // By default we should set sparkCloneConfiguration to true in the Spark config + checkSparkConf(conf, sparkCloneConfiguration, "true"); + + // User can override value for sparkCloneConfiguration in Hive config to false + conf.set(sparkCloneConfiguration, "false"); + checkSparkConf(conf, sparkCloneConfiguration, "false"); + + // User can override value of sparkCloneConfiguration in Hive config to true + conf.set(sparkCloneConfiguration, "true"); + checkSparkConf(conf, sparkCloneConfiguration, "true"); + } + + /** + * Force a Spark config to be generated and check that a config value has the expected value + * @param conf the Hive config to use as a base + * @param paramName the Spark config name to check + * @param expectedValue the expected value in the Spark config + */ + private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException { + SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance(); + SparkSessionImpl sparkSessionImpl = (SparkSessionImpl) + sessionManager.getSession(null, conf, true); + assertTrue(sparkSessionImpl.isOpen()); + HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient(); + SparkConf sparkConf = hiveSparkClient.getSparkConf(); + String cloneConfig = sparkConf.get(paramName); + sessionManager.closeSession(sparkSessionImpl); + assertEquals(expectedValue, cloneConfig); + sessionManager.shutdown(); + } + /* Thread simulating a user session in HiveServer2. */ public class SessionThread implements Runnable {