HIVE-11170 : port parts of HIVE-11015 to master for ease of future merging (Sergey Shelukhin, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d89a7d1e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d89a7d1e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d89a7d1e Branch: refs/heads/beeline-cli Commit: d89a7d1e7fe7fb51aeb514e4357ae149158b2a34 Parents: d314425 Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Jul 9 17:50:32 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Jul 9 17:50:32 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/FilterOperator.java | 3 +- .../hive/ql/exec/mr/ExecMapperContext.java | 10 +- .../ql/io/HiveContextAwareRecordReader.java | 2 +- .../org/apache/hadoop/hive/ql/io/IOContext.java | 43 ------ .../apache/hadoop/hive/ql/io/IOContextMap.java | 81 +++++++++++ .../hadoop/hive/ql/exec/TestOperators.java | 3 +- .../ql/io/TestHiveBinarySearchRecordReader.java | 2 +- .../hadoop/hive/ql/io/TestIOContextMap.java | 133 +++++++++++++++++++ 8 files changed, 223 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java index 65301c0..ae35766 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java @@ -25,6 +25,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements } conditionInspector = null; - ioContext = IOContext.get(hconf); + ioContext = IOContextMap.get(hconf); } catch (Throwable e) { throw new HiveException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 13d0650..fc5abfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FetchOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.mapred.JobConf; @@ -63,11 +63,11 @@ public class ExecMapperContext { public ExecMapperContext(JobConf jc) { this.jc = jc; - ioCxt = IOContext.get(jc); + ioCxt = IOContextMap.get(jc); } public void clear() { - IOContext.clear(); + IOContextMap.clear(); ioCxt = null; } @@ -151,8 +151,4 @@ public class ExecMapperContext { public IOContext getIoCxt() { return ioCxt; } - - public void setIoCxt(IOContext ioCxt) { - this.ioCxt = ioCxt; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 9b3f8ec..738ca9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader } public IOContext getIOContext() { - return IOContext.get(jobConf); + return IOContextMap.get(jobConf); } private void initIOContext(long startPos, boolean isBlockPointer, http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index ebad0a6..019db8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -18,13 +18,7 @@ package org.apache.hadoop.hive.ql.io; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; /** * IOContext basically contains the position information of the current @@ -35,43 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; * nextBlockStart refers the end of current row and beginning of next row. */ public class IOContext { - - /** - * Spark uses this thread local - */ - private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){ - @Override - protected IOContext initialValue() { return new IOContext(); } - }; - - private static IOContext get() { - return IOContext.threadLocal.get(); - } - - /** - * Tez and MR use this map but are single threaded per JVM thus no synchronization is required. - */ - private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>(); - - - public static IOContext get(Configuration conf) { - if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - return get(); - } - String inputName = conf.get(Utilities.INPUT_NAME); - if (!inputNameIOContextMap.containsKey(inputName)) { - IOContext ioContext = new IOContext(); - inputNameIOContextMap.put(inputName, ioContext); - } - - return inputNameIOContextMap.get(inputName); - } - - public static void clear() { - IOContext.threadLocal.remove(); - inputNameIOContextMap.clear(); - } - private long currentBlockStart; private long nextBlockStart; private long currentRow; http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java new file mode 100644 index 0000000..342c526 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; + +/** + * NOTE: before LLAP branch merge, there's no LLAP code here. + * There used to be a global static map of IOContext-s inside IOContext (Hive style!). + * Unfortunately, due to variety of factors, this is now a giant fustercluck. + * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one + * threadlocal IOContext was added for it. + * 2) LLAP has lots of tasks in the same process so globals no longer cut it either. + * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these + * surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them + * to communicate in any way nor provide any shared context. + * So we are going to... + * 1) Keep the good ol' global map for MR and Tez. Hive style! + * 2) Keep the threadlocal for Spark. Hive style! + * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate + * to all the little Tez threads, and we will keep a map per attempt. Hive style squared! + */ +public class IOContextMap { + public static final String DEFAULT_CONTEXT = ""; + private static final Log LOG = LogFactory.getLog(IOContextMap.class); + + /** Used for Tez and MR */ + private static final ConcurrentHashMap<String, IOContext> globalMap = + new ConcurrentHashMap<String, IOContext>(); + + /** Used for Spark */ + private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){ + @Override + protected IOContext initialValue() { return new IOContext(); } + }; + + public static IOContext get(Configuration conf) { + if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + return sparkThreadLocal.get(); + } + String inputName = conf.get(Utilities.INPUT_NAME); + if (inputName == null) { + inputName = DEFAULT_CONTEXT; + } + ConcurrentHashMap<String, IOContext> map; + map = globalMap; + + IOContext ioContext = map.get(inputName); + if (ioContext != null) return ioContext; + ioContext = new IOContext(); + IOContext oldContext = map.putIfAbsent(inputName, ioContext); + return (oldContext == null) ? ioContext : oldContext; + } + + public static void clear() { + sparkThreadLocal.remove(); + globalMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 62057d8..c3a36c0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContext; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -272,7 +273,7 @@ public class TestOperators extends TestCase { JobConf hconf = new JobConf(TestOperators.class); HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME, "hdfs:///testDir/testFile"); - IOContext.get(hconf).setInputPath( + IOContextMap.get(hconf).setInputPath( new Path("hdfs:///testDir/testFile")); // initialize pathToAliases http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java index 7a1748c..9dc4f5b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java @@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase { private void resetIOContext() { conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader"); - ioContext = IOContext.get(conf); + ioContext = IOContextMap.get(conf); ioContext.setUseSorted(false); ioContext.setBinarySearching(false); ioContext.setEndBinarySearch(false); http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java new file mode 100644 index 0000000..4469353 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import static org.junit.Assert.*; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class TestIOContextMap { + + private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + cdlIn.countDown(); + try { + cdlOut.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testMRTezGlobalMap() throws Exception { + // Tests concurrent modification, and that results are the same per input across threads + // but different between inputs. + final int THREAD_COUNT = 2, ITER_COUNT = 1000; + final AtomicInteger countdown = new AtomicInteger(ITER_COUNT); + final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT); + final IOContext[] results = new IOContext[ITER_COUNT]; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1); + + @SuppressWarnings("unchecked") + FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT]; + for (int i = 0; i < tasks.length; ++i) { + tasks[i] = new FutureTask<Void>(new Callable<Void>() { + public Void call() throws Exception { + Configuration conf = new Configuration(); + syncThreadStart(cdlIn, cdlOut); + // Phase 1 - create objects. + while (true) { + int nextIx = countdown.decrementAndGet(); + if (nextIx < 0) break; + conf.set(Utilities.INPUT_NAME, "Input " + nextIx); + results[nextIx] = IOContextMap.get(conf); + if (nextIx == 0) break; + } + phase1End.countDown(); + phase1End.await(); + // Phase 2 - verify we get the expected objects created by all threads. + for (int i = 0; i < ITER_COUNT; ++i) { + conf.set(Utilities.INPUT_NAME, "Input " + i); + IOContext ctx = IOContextMap.get(conf); + assertSame(results[i], ctx); + } + return null; + } + }); + executor.execute(tasks[i]); + } + + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. + for (int i = 0; i < tasks.length; ++i) { + tasks[i].get(); + } + Set<IOContext> resultSet = Sets.newIdentityHashSet(); + for (int i = 0; i < results.length; ++i) { + assertTrue(resultSet.add(results[i])); // All the objects must be different. + } + } + + @Test + public void testSparkThreadLocal() throws Exception { + // Test that input name does not change IOContext returned, and that each thread gets its own. + final Configuration conf1 = new Configuration(); + conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark"); + final Configuration conf2 = new Configuration(conf1); + conf2.set(Utilities.INPUT_NAME, "Other input"); + final int THREAD_COUNT = 2; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1); + @SuppressWarnings("unchecked") + FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT]; + for (int i = 0; i < tasks.length; ++i) { + tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() { + public IOContext call() throws Exception { + syncThreadStart(cdlIn, cdlOut); + IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2); + assertSame(c1, c2); + return c1; + } + }); + executor.execute(tasks[i]); + } + + cdlIn.await(); // Wait for all threads to be ready. + cdlOut.countDown(); // Release them at the same time. + Set<IOContext> results = Sets.newIdentityHashSet(); + for (int i = 0; i < tasks.length; ++i) { + assertTrue(results.add(tasks[i].get())); // All the objects must be different. + } + } + +}