HIVE-13594: Misc cleanup on llap branch

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b43cd4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b43cd4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b43cd4

Branch: refs/heads/master
Commit: 53b43cd440a2ee32efa5ad62f2684a4578f390ca
Parents: e69bd1e
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Apr 22 14:33:36 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Apr 22 14:33:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 --
 .../test/resources/testconfiguration.properties |  3 +-
 jdbc/pom.xml                                    | 16 +++----
 .../hadoop/hive/llap/LlapBaseRecordReader.java  |  2 +-
 .../ext/LlapTaskUmbilicalExternalClient.java    |  4 +-
 .../org/apache/hadoop/hive/llap/LlapDump.java   |  3 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  3 --
 .../hive/ql/exec/SerializationUtilities.java    |  2 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java    | 23 ++++-----
 .../hive/ql/optimizer/SimpleFetchOptimizer.java | 50 +++++++++++---------
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  6 +--
 11 files changed, 51 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fa724ae..c8c26db 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2763,9 +2763,6 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 
15003,
         "LLAP daemon output service port"),
 
-    LLAP_TMP_SUBMITWORK_USING_TEZ_AM("hive.llap.tmp.submit.work.using.tez.am", 
true,""),
-    
LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS("hive.llap.tmp.ext.client.num.server.handlers",
 1, ""),
-
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for requests from Hive client to remote Spark driver."),

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 1669b9c..e46e6ce 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -503,8 +503,7 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\
   vectorized_dynamic_partition_pruning.q,\
   tez_multi_union.q,\
   tez_join.q,\
-  tez_union_multiinsert.q,\
-  udtf_get_splits.q
+  tez_union_multiinsert.q
 
 
 minillap.query.files=llap_udf.q

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index c99a351..f87ab59 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -42,13 +42,14 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
       <artifactId>hive-service</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -70,11 +71,6 @@
       <artifactId>hive-service-rpc</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-llap-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <!-- inter-project -->
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java 
b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 7073280..0cd9672 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -191,7 +191,7 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
       ReaderEvent event = readerEvents.take();
       return event;
     } catch (InterruptedException ie) {
-      throw new RuntimeException("Interrupted while getting readerEvents, not 
expected", ie);
+      throw new RuntimeException("Interrupted while getting readerEvents, not 
expected: " + ie.getMessage(), ie);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 7d06637..8598bc8 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -102,8 +102,8 @@ public class LlapTaskUmbilicalExternalClient extends 
AbstractService {
 
   @Override
   public void serviceStart() throws IOException {
-    int numHandlers = HiveConf.getIntVar(conf,
-        HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
+    // If we use a single server for multiple external clients, then consider 
using more than one handler.
+    int numHandlers = 1;
     llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, 
numHandlers, tokenIdentifier, sessionToken);
     communicator.start();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
index 1c4397f..d485bfa 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
@@ -40,7 +40,6 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
@@ -74,7 +73,7 @@ public class LlapDump {
 
     if (cli.hasOption('h')) {
       HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("orcfiledump", opts);
+      formatter.printHelp("llapdump", opts);
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index d2dfbb7..ec6381b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -36,13 +36,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -203,7 +201,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           }
         }
       }
-
       try {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index eaa4293..b05a79e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -570,7 +570,7 @@ public class SerializationUtilities {
    * @param plan Usually of type MapredWork, MapredLocalWork etc.
    * @param out stream in which serialized plan is written into
    */
-  public static void serializeObjectByKryo(Kryo kryo, Object plan, 
OutputStream out) {
+  private static void serializeObjectByKryo(Kryo kryo, Object plan, 
OutputStream out) {
     Output output = new Output(out);
     kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader());
     kryo.writeObject(output, plan);

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index b16368f..4e6272f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -78,17 +78,16 @@ public class HiveSplitGenerator extends InputInitializer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveSplitGenerator.class);
 
-  private DynamicPartitionPruner pruner = null;
-  private Configuration conf = null;
-  private JobConf jobConf = null;
-  private MRInputUserPayloadProto userPayloadProto = null;
-  private MapWork work = null;
+  private final DynamicPartitionPruner pruner;
+  private final Configuration conf;
+  private final JobConf jobConf;
+  private final MRInputUserPayloadProto userPayloadProto;
+  private final MapWork work;
   private final SplitGrouper splitGrouper = new SplitGrouper();
-  private SplitLocationProvider splitLocationProvider = null;
+  private final SplitLocationProvider splitLocationProvider;
 
-  
-  // TODO RSHACK This entire method needs to be reworked. Put back final 
fields, separate into reusable components etc.
-  public void initializeSplitGenerator(Configuration conf, MapWork work) 
throws IOException {
+  public HiveSplitGenerator(Configuration conf, MapWork work) throws 
IOException {
+    super(null);
 
     this.conf = conf;
     this.work = work;
@@ -103,8 +102,6 @@ public class HiveSplitGenerator extends InputInitializer {
     // Read all credentials into the credentials instance stored in JobConf.
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
-    this.work = Utilities.getMapWork(jobConf);
-
     // Events can start coming in the moment the InputInitializer is created. 
The pruner
     // must be setup and initialized here so that it sets up it's structures 
to start accepting events.
     // Setting it up in initialize leads to a window where events may come in 
before the pruner is
@@ -116,9 +113,7 @@ public class HiveSplitGenerator extends InputInitializer {
   public HiveSplitGenerator(InputInitializerContext initializerContext) throws 
IOException,
       SerDeException {
     super(initializerContext);
-    if (initializerContext == null) {
-      return;
-    }
+
     Preconditions.checkNotNull(initializerContext);
     userPayloadProto =
         
MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index ca8dccf..b5ceb14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -141,31 +140,35 @@ public class SimpleFetchOptimizer extends Transform {
   }
 
   private boolean checkThreshold(FetchData data, int limit, ParseContext pctx) 
throws Exception {
-    boolean result = false;
-
     if (limit > 0) {
       if (data.hasOnlyPruningFilter()) {
         /* partitioned table + query has only pruning filters */
-        result = true;
+        return true;
       } else if (data.isPartitioned() == false && data.isFiltered() == false) {
         /* unpartitioned table + no filters */
-        result = true;
+        return true;
       }
       /* fall through */
-    } else {
-      long threshold = HiveConf.getLongVar(pctx.getConf(),
-         HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
-      if (threshold < 0) {
-       result = true;
-      } else {
-       long remaining = threshold;
-       remaining -= data.getInputLength(pctx, remaining);
-       if (remaining >= 0) {
-         result = true;
-       }
+    }
+    long threshold = HiveConf.getLongVar(pctx.getConf(),
+        HiveConf.ConfVars.HIVEFETCHTASKCONVERSIONTHRESHOLD);
+    if (threshold < 0) {
+      return true;
+    }
+    Operator child = data.scanOp.getChildOperators().get(0);
+    if(child instanceof SelectOperator) {
+      // select *, constant and casts can be allowed without a threshold check
+      if (checkExpressions((SelectOperator)child)) {
+        return true;
       }
     }
-    return result;
+    long remaining = threshold;
+    remaining -= data.getInputLength(pctx, remaining);
+    if (remaining < 0) {
+      LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode");
+      return false;
+    }
+    return true;
   }
 
   // all we can handle is LimitOperator, FilterOperator SelectOperator and 
final FS
@@ -184,20 +187,23 @@ public class SimpleFetchOptimizer extends Transform {
       return null;
     }
     Table table = ts.getConf().getTableMetadata();
+    if (table == null) {
+      return null;
+    }
     ReadEntity parent = PlanUtils.getParentViewInfo(alias, 
pctx.getViewAliasToInput());
-    if (table != null && !table.isPartitioned()) {
+    if (!table.isPartitioned()) {
       FetchData fetch = new FetchData(ts, parent, table, splitSample);
       return checkOperators(fetch, aggressive, false);
     }
 
     boolean bypassFilter = false;
-    if (table != null && HiveConf.getBoolVar(pctx.getConf(), 
HiveConf.ConfVars.HIVEOPTPPD)) {
+    if (HiveConf.getBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVEOPTPPD)) {
       ExprNodeDesc pruner = pctx.getOpToPartPruner().get(ts);
       if (PartitionPruner.onlyContainsPartnCols(table, pruner)) {
         bypassFilter = !pctx.getPrunedPartitions(alias, 
ts).hasUnknownPartitions();
       }
     }
-    if (table != null && !aggressive && !bypassFilter) {
+    if (!aggressive && !bypassFilter) {
       return null;
     }
     PrunedPartitionList partitions = pctx.getPrunedPartitions(alias, ts);
@@ -225,7 +231,7 @@ public class SimpleFetchOptimizer extends Transform {
         continue;
       }
 
-      if (!(op instanceof LimitOperator || (op instanceof FilterOperator && 
bypassFilter)) || op instanceof UDTFOperator) {
+      if (!(op instanceof LimitOperator || (op instanceof FilterOperator && 
bypassFilter))) {
         break;
       }
 
@@ -283,7 +289,7 @@ public class SimpleFetchOptimizer extends Transform {
 
   private boolean isConvertible(FetchData fetch, Operator<?> operator, 
Set<Operator<?>> traversed) {
     if (operator instanceof ReduceSinkOperator || operator instanceof 
CommonJoinOperator
-        || operator instanceof ScriptOperator) {
+        || operator instanceof ScriptOperator || operator instanceof 
UDTFOperator) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/53b43cd4/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 9a52c7d..2d36e5c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -298,15 +298,13 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
 
       // we have the dag now proceed to get the splits:
-      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
               HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
               HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
-      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, 
mapWork);
       List<Event> eventList = splitGenerator.initialize();
 
-      // hack - just serializing with kryo for now. This needs to be done 
properly
       InputSplit[] result = new InputSplit[eventList.size() - 1];
       DataOutputBuffer dob = new DataOutputBuffer();
 
@@ -458,7 +456,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         break;
       case VARCHAR:
         VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo;
-        typeDesc = new TypeDesc(TypeDesc.Type.CHAR, 
varcharTypeInfo.getLength());
+        typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, 
varcharTypeInfo.getLength());
         break;
       case DATE:
         typeDesc = new TypeDesc(TypeDesc.Type.DATE);

Reply via email to