DRILL-653: Fix serialization of HiveSubScan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5770568d Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5770568d Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5770568d Branch: refs/heads/master Commit: 5770568d88a6a219a854ca5c72f2e6c19164845a Parents: 8a37b94 Author: Mehant Baid <[email protected]> Authored: Wed May 14 18:52:04 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Fri May 16 13:35:04 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/store/hive/HiveScan.java | 2 +- .../exec/store/hive/HiveScanBatchCreator.java | 6 +- .../drill/exec/store/hive/HiveSubScan.java | 81 +++++++++++++------- .../apache/drill/jdbc/test/TestJdbcQuery.java | 1 + 4 files changed, 57 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5770568d/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index ed5a6cc..c6105ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -203,7 +203,7 @@ public class HiveScan extends AbstractGroupScan { splitTypes.add(split.getClass().getCanonicalName()); } if (parts.contains(null)) parts = null; - return new HiveSubScan(table, parts, encodedInputSplits, splitTypes, columns); + return new HiveSubScan(encodedInputSplits, hiveReadEntry, splitTypes, columns); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5770568d/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java index 62f2ec7..3dc9ac4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -37,9 +37,9 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { @Override public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException { List<RecordReader> readers = Lists.newArrayList(); - Table table = config.table; + Table table = config.getTable(); List<InputSplit> splits = config.getInputSplits(); - List<Partition> partitions = config.partitions; + List<Partition> partitions = config.getPartitions(); if (partitions == null || partitions.size() == 0) { if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && @@ -62,7 +62,7 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { } } else { for (InputSplit split : splits) { - readers.add(new HiveRecordReader(config.table, partitions.get(i++), split, config.getColumns(), context)); + readers.add(new HiveRecordReader(config.getTable(), partitions.get(i++), split, config.getColumns(), context)); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5770568d/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index 0a02097..7f2d0f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -17,11 +17,12 @@ */ package org.apache.drill.exec.store.hive; -import com.beust.jcommander.internal.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteStreams; import org.apache.commons.codec.binary.Base64; @@ -39,39 +40,69 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +@JsonTypeName("hive-sub-scan") public class HiveSubScan extends AbstractBase implements SubScan { - @JsonProperty("splits") - public List<String> encodedSplits; - @JsonProperty("hive-table") - public Table table; - @JsonProperty("partitions") - public List<Partition> partitions; + private List<String> splits; + + private HiveReadEntry hiveReadEntry; + + private List<String> splitClasses; + + private List<SchemaPath> columns; + @JsonIgnore private List<InputSplit> inputSplits = Lists.newArrayList(); - @JsonProperty("splitClass") - public List<String> splitClasses; - - @JsonProperty("columns") - public List<SchemaPath> columns; + @JsonIgnore + private Table table; + @JsonIgnore + private List<Partition> partitions; @JsonCreator - public HiveSubScan(@JsonProperty("hive-table") Table table, - @JsonProperty("partition") List<Partition> partitions, - @JsonProperty("splits") List<String> encodedSplits, + public HiveSubScan(@JsonProperty("splits") List<String> splits, + @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List<String> splitClasses, @JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException { - this.table = table; - this.partitions = partitions; - this.encodedSplits = encodedSplits; + this.hiveReadEntry = hiveReadEntry; + this.table = hiveReadEntry.getTable(); + this.partitions = hiveReadEntry.getPartitions(); + this.splits = splits; this.splitClasses = splitClasses; this.columns = columns; - for (int i = 0; i < encodedSplits.size(); i++) { - inputSplits.add(deserializeInputSplit(encodedSplits.get(i), splitClasses.get(i))); + for (int i = 0; i < splits.size(); i++) { + inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i))); } } + public List<String> getSplits() { + return splits; + } + + public Table getTable() { + return table; + } + + public List<Partition> getPartitions() { + return partitions; + } + + public List<String> getSplitClasses() { + return splitClasses; + } + + public List<SchemaPath> getColumns() { + return columns; + } + + public List<InputSplit> getInputSplits() { + return inputSplits; + } + + public HiveReadEntry getHiveReadEntry() { + return hiveReadEntry; + } + public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{ InputSplit split; if (Class.forName(className) == FileSplit.class) { @@ -84,14 +115,6 @@ public class HiveSubScan extends AbstractBase implements SubScan { return split; } - public List<SchemaPath> getColumns() { - return columns; - } - - public List<InputSplit> getInputSplits() { - return inputSplits; - } - @Override public OperatorCost getCost() { return new OperatorCost(1, 2, 1, 1); @@ -111,7 +134,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { try { - return new HiveSubScan(table, partitions, encodedSplits, splitClasses, columns); + return new HiveSubScan(splits, hiveReadEntry, splitClasses, columns); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5770568d/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java index 2fd23d3..191aa0a 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java @@ -74,6 +74,7 @@ public class TestJdbcQuery extends JdbcTest{ @Test public void testHiveReadWithDb() throws Exception{ testQuery("select * from hive.`default`.kv"); + testQuery("select key from hive.`default`.kv group by key"); }
