Repository: hive Updated Branches: refs/heads/branch-3 2048f6262 -> 1ce6c7c2a
HIVE-20649: LLAP aware memory manager for Orc writers (Prasanth Jayachandran reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1ce6c7c2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1ce6c7c2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1ce6c7c2 Branch: refs/heads/branch-3 Commit: 1ce6c7c2a3a93f0f92078ba8c929a870eaa8134d Parents: 2048f62 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Sun Oct 14 21:34:08 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Sun Oct 14 21:34:42 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 ++ .../apache/hadoop/hive/ql/io/orc/OrcFile.java | 48 +++++++++++++++++++- .../hadoop/hive/ql/io/orc/TestOrcFile.java | 41 +++++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3852d79..a04ef38 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1923,6 +1923,10 @@ public class HiveConf extends Configuration { " ETL strategy is used when spending little more time in split generation is acceptable" + " (split generation reads and caches file footers). HYBRID chooses between the above strategies" + " based on heuristics."), + HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED("hive.exec.orc.writer.llap.memory.manager.enabled", true, + "Whether orc writers should use llap-aware memory manager. LLAP aware memory manager will use memory\n" + + "per executor instead of entire heap memory when concurrent orc writers are involved. This will let\n" + + "task fragments to use memory within its limit (memory per executor) when performing ETL in LLAP."), // hive streaming ingest settings HIVE_STREAMING_AUTO_FLUSH_ENABLED("hive.streaming.auto.flush.enabled", true, "Whether to enable memory \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index e7dfb05..e246ac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -24,20 +24,29 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.orc.FileMetadata; +import org.apache.orc.OrcConf; import org.apache.orc.PhysicalWriter; import org.apache.orc.MemoryManager; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * Contains factory methods to read or write ORC files. */ public final class OrcFile extends org.apache.orc.OrcFile { - + private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class); // unused protected OrcFile() {} @@ -96,6 +105,37 @@ public final class OrcFile extends org.apache.orc.OrcFile { return new ReaderImpl(path, options); } + @VisibleForTesting + static class LlapAwareMemoryManager extends MemoryManagerImpl { + private final double maxLoad; + private final long totalMemoryPool; + + public LlapAwareMemoryManager(Configuration conf) { + super(conf); + maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); + long memPerExecutor = LlapDaemonInfo.INSTANCE.getMemoryPerExecutor(); + totalMemoryPool = (long) (memPerExecutor * maxLoad); + if (LOG.isDebugEnabled()) { + LOG.debug("Using LLAP memory manager for orc writer. memPerExecutor: {} maxLoad: {} totalMemPool: {}", + LlapUtil.humanReadableByteCount(memPerExecutor), maxLoad, LlapUtil.humanReadableByteCount(totalMemoryPool)); + } + } + + @Override + public long getTotalMemoryPool() { + return totalMemoryPool; + } + } + + private static ThreadLocal<MemoryManager> threadLocalOrcLlapMemoryManager = null; + + private static synchronized MemoryManager getThreadLocalOrcLlapMemoryManager(final Configuration conf) { + if (threadLocalOrcLlapMemoryManager == null) { + threadLocalOrcLlapMemoryManager = ThreadLocal.withInitial(() -> new LlapAwareMemoryManager(conf)); + } + return threadLocalOrcLlapMemoryManager.get(); + } + /** * Options for creating ORC file writers. */ @@ -111,6 +151,10 @@ public final class OrcFile extends org.apache.orc.OrcFile { WriterOptions(Properties tableProperties, Configuration conf) { super(tableProperties, conf); useUTCTimestamp(true); + if (conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, true) && + LlapProxy.isDaemon()) { + memory(getThreadLocalOrcLlapMemoryManager(conf)); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/1ce6c7c2/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 97d4fc6..2931c04 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hive.ql.io.orc; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -43,6 +45,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; @@ -91,6 +96,7 @@ import org.apache.orc.StringColumnStatistics; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManagerImpl; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -2202,4 +2208,39 @@ public class TestOrcFile { assertEquals(false, reader.hasNext()); reader.close(); } + + @Test + public void testLlapAwareMemoryManager() throws IOException { + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + try { + OrcFile.WriterOptions opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + Writer writer = OrcFile.createWriter(new Path(testFilePath, "-0"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class); + + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); + LlapDaemonInfo.initialize("test", new Configuration()); + LlapProxy.setDaemon(true); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-1"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), OrcFile.LlapAwareMemoryManager.class); + assertEquals(LlapDaemonInfo.INSTANCE.getMemoryPerExecutor() * 0.5, + ((OrcFile.LlapAwareMemoryManager) opts.getMemoryManager()).getTotalMemoryPool(), 100); + + conf.setBoolean(HiveConf.ConfVars.HIVE_ORC_WRITER_LLAP_MEMORY_MANAGER_ENABLED.varname, false); + opts = OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB); + writer = OrcFile.createWriter(new Path(testFilePath, "-2"), opts); + writer.close(); + assertEquals(opts.getMemoryManager().getClass(), MemoryManagerImpl.class); + } finally { + LlapProxy.setDaemon(false); + conf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "container"); + } + } }