HIVE-13594: Misc cleanup on llap branch
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b43cd4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b43cd4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b43cd4 Branch: refs/heads/master Commit: 53b43cd440a2ee32efa5ad62f2684a4578f390ca Parents: e69bd1e Author: Jason Dere <jd...@hortonworks.com> Authored: Fri Apr 22 14:33:36 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Fri Apr 22 14:33:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 -- .../test/resources/testconfiguration.properties | 3 +- jdbc/pom.xml | 16 +++---- .../hadoop/hive/llap/LlapBaseRecordReader.java | 2 +- .../ext/LlapTaskUmbilicalExternalClient.java | 4 +- .../org/apache/hadoop/hive/llap/LlapDump.java | 3 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 3 -- .../hive/ql/exec/SerializationUtilities.java | 2 +- .../hive/ql/exec/tez/HiveSplitGenerator.java | 23 ++++----- .../hive/ql/optimizer/SimpleFetchOptimizer.java | 50 +++++++++++--------- .../ql/udf/generic/GenericUDTFGetSplits.java | 6 +-- 11 files changed, 51 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fa724ae..c8c26db 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2763,9 +2763,6 @@ public class HiveConf extends Configuration { LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), - LLAP_TMP_SUBMITWORK_USING_TEZ_AM("hive.llap.tmp.submit.work.using.tez.am", true,""), - LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS("hive.llap.tmp.ext.client.num.server.handlers", 1, ""), - SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1669b9c..e46e6ce 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -503,8 +503,7 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\ vectorized_dynamic_partition_pruning.q,\ tez_multi_union.q,\ tez_join.q,\ - tez_union_multiinsert.q,\ - udtf_get_splits.q + tez_union_multiinsert.q minillap.query.files=llap_udf.q http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index c99a351..f87ab59 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -42,13 +42,14 @@ </dependency> <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> @@ -70,11 +71,6 @@ <artifactId>hive-service-rpc</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-llap-common</artifactId> - <version>${project.version}</version> - </dependency> <!-- inter-project --> <dependency> <groupId>org.apache.httpcomponents</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 7073280..0cd9672 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -191,7 +191,7 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor ReaderEvent event = readerEvents.take(); return event; } catch (InterruptedException ie) { - throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie); + throw new RuntimeException("Interrupted while getting readerEvents, not expected: " + ie.getMessage(), ie); } } http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 7d06637..8598bc8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -102,8 +102,8 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { @Override public void serviceStart() throws IOException { - int numHandlers = HiveConf.getIntVar(conf, - HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS); + // If we use a single server for multiple external clients, then consider using more than one handler. + int numHandlers = 1; llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken); communicator.start(); } http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java index 1c4397f..d485bfa 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java @@ -40,7 +40,6 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; @@ -74,7 +73,7 @@ public class LlapDump { if (cli.hasOption('h')) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("orcfiledump", opts); + formatter.printHelp("llapdump", opts); return; } http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index d2dfbb7..ec6381b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -36,13 +36,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -203,7 +201,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } } - try { for (int i = 0; i < updaters.length; i++) { if (updaters[i] != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index eaa4293..b05a79e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -570,7 +570,7 @@ public class SerializationUtilities { * @param plan Usually of type MapredWork, MapredLocalWork etc. * @param out stream in which serialized plan is written into */ - public static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { + private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { Output output = new Output(out); kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader()); kryo.writeObject(output, plan); http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index b16368f..4e6272f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -78,17 +78,16 @@ public class HiveSplitGenerator extends InputInitializer { private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class); - private DynamicPartitionPruner pruner = null; - private Configuration conf = null; - private JobConf jobConf = null; - private MRInputUserPayloadProto userPayloadProto = null; - private MapWork work = null; + private final DynamicPartitionPruner pruner; + private final Configuration conf; + private final JobConf jobConf; + private final MRInputUserPayloadProto userPayloadProto; + private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); - private SplitLocationProvider splitLocationProvider = null; + private final SplitLocationProvider splitLocationProvider; - - // TODO RSHACK This entire method needs to be reworked. Put back final fields, separate into reusable components etc. - public void initializeSplitGenerator(Configuration conf, MapWork work) throws IOException { + public HiveSplitGenerator(Configuration conf, MapWork work) throws IOException { + super(null); this.conf = conf; this.work = work; @@ -103,8 +102,6 @@ public class HiveSplitGenerator extends InputInitializer { // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); - this.work = Utilities.getMapWork(jobConf); - // Events can start coming in the moment the InputInitializer is created. The pruner // must be setup and initialized here so that it sets up it's structures to start accepting events. // Setting it up in initialize leads to a window where events may come in before the pruner is @@ -116,9 +113,7 @@ public class HiveSplitGenerator extends InputInitializer { public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, SerDeException { super(initializerContext); - if (initializerContext == null) { - return; - } + Preconditions.checkNotNull(initializerContext); userPayloadProto = MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload()); http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index ca8dccf..b5ceb14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; -import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -141,31 +140,35 @@ public class SimpleFetchOptimizer extends Transform { } private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) throws Exception { - boolean result = false; - if (limit > 0) { if (data.hasOnlyPruningFilter()) { /* partitioned table + query has only pruning filters */ - result = true; + return true; } else if (data.isPartitioned() == false && data.isFiltered() == false) { /* unpartitioned table + no filters */ - result = true; + return true; } /* fall through */ - } else { - long threshold = HiveConf.getLongVar(pctx.getConf(), - HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD); - if (threshold < 0) { - result = true; - } else { - long remaining = threshold; - remaining -= data.getInputLength(pctx, remaining); - if (remaining >= 0) { - result = true; - } + } + long threshold = HiveConf.getLongVar(pctx.getConf(), + HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD); + if (threshold < 0) { + return true; + } + Operator child = data.scanOp.getChildOperators().get(0); + if(child instanceof SelectOperator) { + // select *, constant and casts can be allowed without a threshold check + if (checkExpressions((SelectOperator)child)) { + return true; } } - return result; + long remaining = threshold; + remaining -= data.getInputLength(pctx, remaining); + if (remaining < 0) { + LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode"); + return false; + } + return true; } // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS @@ -184,20 +187,23 @@ public class SimpleFetchOptimizer extends Transform { return null; } Table table = ts.getConf().getTableMetadata(); + if (table == null) { + return null; + } ReadEntity parent = PlanUtils.getParentViewInfo(alias, pctx.getViewAliasToInput()); - if (table != null && !table.isPartitioned()) { + if (!table.isPartitioned()) { FetchData fetch = new FetchData(ts, parent, table, splitSample); return checkOperators(fetch, aggressive, false); } boolean bypassFilter = false; - if (table != null && HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) { + if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) { ExprNodeDesc pruner = pctx.getOpToPartPruner().get(ts); if (PartitionPruner.onlyContainsPartnCols(table, pruner)) { bypassFilter = !pctx.getPrunedPartitions(alias, ts).hasUnknownPartitions(); } } - if (table != null && !aggressive && !bypassFilter) { + if (!aggressive && !bypassFilter) { return null; } PrunedPartitionList partitions = pctx.getPrunedPartitions(alias, ts); @@ -225,7 +231,7 @@ public class SimpleFetchOptimizer extends Transform { continue; } - if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)) || op instanceof UDTFOperator) { + if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) { break; } @@ -283,7 +289,7 @@ public class SimpleFetchOptimizer extends Transform { private boolean isConvertible(FetchData fetch, Operator<?> operator, Set<Operator<?>> traversed) { if (operator instanceof ReduceSinkOperator || operator instanceof CommonJoinOperator - || operator instanceof ScriptOperator) { + || operator instanceof ScriptOperator || operator instanceof UDTFOperator) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 9a52c7d..2d36e5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -298,15 +298,13 @@ public class GenericUDTFGetSplits extends GenericUDTF { // we have the dag now proceed to get the splits: - HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null); Preconditions.checkState(HiveConf.getBoolVar(wxConf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); Preconditions.checkState(HiveConf.getBoolVar(wxConf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); - splitGenerator.initializeSplitGenerator(wxConf, mapWork); + HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork); List<Event> eventList = splitGenerator.initialize(); - // hack - just serializing with kryo for now. This needs to be done properly InputSplit[] result = new InputSplit[eventList.size() - 1]; DataOutputBuffer dob = new DataOutputBuffer(); @@ -458,7 +456,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { break; case VARCHAR: VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.CHAR, varcharTypeInfo.getLength()); + typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, varcharTypeInfo.getLength()); break; case DATE: typeDesc = new TypeDesc(TypeDesc.Type.DATE);