Repository: drill Updated Branches: refs/heads/master fe8471316 -> fe79a633a
DRILL-5830: Resolve regressions to MapR DB from DRILL-5546 - Back out HBase changes - Code cleanup - Test utilities - Fix for DRILL-5829 closes #968 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/42f7af22 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/42f7af22 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/42f7af22 Branch: refs/heads/master Commit: 42f7af22fc5d713aac07e057fd374ccd674e40df Parents: fe84713 Author: Paul Rogers <prog...@maprtech.com> Authored: Thu Sep 28 09:49:38 2017 -0700 Committer: Paul Rogers <prog...@maprtech.com> Committed: Wed Oct 11 13:14:56 2017 -0700 ---------------------------------------------------------------------- .../drill/exec/store/hbase/HBaseGroupScan.java | 95 ++--- .../exec/store/hbase/HBaseRecordReader.java | 6 +- .../drill/exec/store/hbase/HBaseScanSpec.java | 1 - .../exec/store/hbase/HBaseSchemaFactory.java | 4 +- .../exec/store/hbase/HBaseStoragePlugin.java | 4 +- .../drill/exec/store/hbase/HBaseSubScan.java | 9 +- .../org/apache/drill/hbase/BaseHBaseTest.java | 2 +- .../exec/client/PrintingResultsListener.java | 55 +-- .../exec/physical/config/UnionExchange.java | 7 +- .../UnorderedReceiverBatch.java | 5 +- .../UnorderedReceiverCreator.java | 6 +- .../planner/logical/DrillPushProjIntoScan.java | 1 - .../exec/planner/physical/ProjectPrel.java | 2 +- .../apache/drill/exec/record/BatchSchema.java | 15 + .../drill/exec/record/RecordBatchLoader.java | 63 +++- .../drill/exec/record/VectorContainer.java | 2 +- .../org/apache/drill/exec/util/VectorUtil.java | 27 +- .../work/foreman/rm/DistributedQueryQueue.java | 3 + .../drill/exec/record/vector/TestLoad.java | 358 +++++++++++++++++-- .../org/apache/drill/test/QueryBuilder.java | 4 + .../apache/drill/test/QueryRowSetIterator.java | 118 ++++++ .../src/main/java/io/netty/buffer/DrillBuf.java | 6 +- .../drill/exec/record/MaterializedField.java | 53 +++ 23 files changed, 691 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 1ee1da8..2b8cf18 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,17 +17,22 @@ */ package org.apache.drill.exec.store.hbase; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -50,24 +55,18 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; @JsonTypeName("hbase-scan") public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants { @@ -144,8 +143,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst @Override public GroupScan clone(List<SchemaPath> columns) { HBaseGroupScan newScan = new HBaseGroupScan(this); - newScan.columns = columns == null ? ALL_COLUMNS : columns;; - newScan.verifyColumnsAndConvertStar(); + newScan.columns = columns == null ? ALL_COLUMNS : columns; + newScan.verifyColumns(); return newScan; } @@ -177,37 +176,19 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } catch (IOException e) { throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); } - verifyColumnsAndConvertStar(); + verifyColumns(); } - private void verifyColumnsAndConvertStar() { - boolean hasStarCol = false; - LinkedHashSet<SchemaPath> requestedColumns = new LinkedHashSet<>(); - + private void verifyColumns() { + if (Utilities.isStarQuery(columns)) { + return; + } for (SchemaPath column : columns) { - // convert * into [row_key, cf1, cf2, ..., cf_n]. - if (column.equals(Utilities.STAR_COLUMN)) { - hasStarCol = true; - Set<byte[]> families = hTableDesc.getFamiliesKeys(); - requestedColumns.add(ROW_KEY_PATH); - for (byte[] family : families) { - SchemaPath colFamily = SchemaPath.getSimplePath(Bytes.toString(family)); - requestedColumns.add(colFamily); - } - } else { - if (!(column.equals(ROW_KEY_PATH) || - hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { - DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", - column.getRootSegment().getPath(), hTableDesc.getNameAsString()); - } - requestedColumns.add(column); + if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); } } - - // since star column has been converted, reset this.cloumns. - if (hasStarCol) { - this.columns = new ArrayList<>(requestedColumns); - } } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index d6c02b5..cae7ce4 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -126,12 +126,10 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())); } } else { - throw new IllegalArgumentException("HBaseRecordReader does not allow column *. Column * should have been converted to list of <row_key, column family1, column family2, ..., column family_n"); -// rowKeyOnly = false; -// transformed.add(ROW_KEY_PATH); + rowKeyOnly = false; + transformed.add(ROW_KEY_PATH); } - return transformed; } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java index f9a585e..797ec7f 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanSpec.java @@ -93,5 +93,4 @@ public class HBaseScanSpec { + ", filter=" + (filter == null ? null : filter.toString()) + "]"; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index 56dfc10..548b679 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -94,7 +94,5 @@ public class HBaseSchemaFactory implements SchemaFactory { public String getTypeName() { return HBaseStoragePluginConfig.NAME; } - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index 81899cf..62f351c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -154,7 +154,5 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin { private HBaseStoragePlugin getHBaseStoragePlugin() { return HBaseStoragePlugin.this; } - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 302ccca..0527391 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -41,7 +41,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -// Class containing information for reading a single HBase region +/** + * Contains information for reading a single HBase region + */ + @JsonTypeName("hbase-region-scan") public class HBaseSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class); @@ -210,12 +213,10 @@ public class HBaseSubScan extends AbstractBase implements SubScan { + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString()) + ", regionServer=" + regionServer + "]"; } - } @Override public int getOperatorType() { return CoreOperatorType.HBASE_SUB_SCAN_VALUE; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java index e12c77c..b957347 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java @@ -38,7 +38,7 @@ import com.google.common.io.Files; public class BaseHBaseTest extends BaseTestQuery { - private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase"; + public static final String HBASE_STORAGE_PLUGIN_NAME = "hbase"; protected static Configuration conf = HBaseConfiguration.create(); http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java index bdd2fab..c233837 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -71,35 +71,44 @@ public class PrintingResultsListener implements UserResultsListener { } @Override + @SuppressWarnings("resource") public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { final QueryData header = result.getHeader(); final DrillBuf data = result.getData(); - if (data != null) { - count.addAndGet(header.getRowCount()); - try { - loader.load(header.getDef(), data); - // TODO: Clean: DRILL-2933: That load(...) no longer throws - // SchemaChangeException, so check/clean catch clause below. - } catch (SchemaChangeException e) { - submissionFailed(UserException.systemError(e).build(logger)); - } + try { + if (data != null) { + count.addAndGet(header.getRowCount()); + try { + loader.load(header.getDef(), data); + // TODO: Clean: DRILL-2933: That load(...) no longer throws + // SchemaChangeException, so check/clean catch clause below. + } catch (SchemaChangeException e) { + submissionFailed(UserException.systemError(e).build(logger)); + } - switch(format) { - case TABLE: - VectorUtil.showVectorAccessibleContent(loader, columnWidth); - break; - case TSV: - VectorUtil.showVectorAccessibleContent(loader, "\t"); - break; - case CSV: - VectorUtil.showVectorAccessibleContent(loader, ","); - break; + try { + switch(format) { + case TABLE: + VectorUtil.showVectorAccessibleContent(loader, columnWidth); + break; + case TSV: + VectorUtil.showVectorAccessibleContent(loader, "\t"); + break; + case CSV: + VectorUtil.showVectorAccessibleContent(loader, ","); + break; + default: + throw new IllegalStateException(format.toString()); + } + } finally { + loader.clear(); + } } - loader.clear(); } - - result.release(); + finally { + result.release(); + } } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java index 318e6b1..c825fcb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -33,9 +33,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("union-exchange") -public class UnionExchange extends AbstractExchange{ - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class); +public class UnionExchange extends AbstractExchange { public UnionExchange(@JsonProperty("child") PhysicalOperator child) { super(child); @@ -76,5 +74,4 @@ public class UnionExchange extends AbstractExchange{ protected PhysicalOperator getNewWithChild(PhysicalOperator child) { return new UnionExchange(child); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index be7d4ed..cfdc06d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -181,9 +181,6 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { return IterOutcome.OUT_OF_MEMORY; } - -// logger.debug("Next received batch {}", batch); - final RecordBatchDef rbd = batch.getHeader().getDef(); final boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); // TODO: Clean: DRILL-2933: That load(...) no longer throws http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 649ecd9..6d4f1d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,8 +28,8 @@ import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.batch.RawBatchBuffer; public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class); + @SuppressWarnings("resource") @Override public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) throws ExecutionSetupException { @@ -42,6 +42,4 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> RawBatchBuffer buffer = buffers[0]; return new UnorderedReceiverBatch(context, buffer, receiver); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 33c840b..b15a843 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import org.apache.calcite.adapter.enumerable.EnumerableTableScan; -import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalProject; http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java index d974bad..7f634c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 63dcdb45..564aaed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -136,6 +136,21 @@ public class BatchSchema implements Iterable<MaterializedField> { return true; } + public boolean isEquivalent(BatchSchema other) { + if (fields == null || other.fields == null) { + return fields == other.fields; + } + if (fields.size() != other.fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); i++) { + if (! fields.get(i).isEquivalent(other.fields.get(i))) { + return false; + } + } + return true; + } + /** * We treat fields with same set of Subtypes as equal, even if they are in a different order * @param t1 http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 9a8483b..20b5cb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -19,12 +19,15 @@ package org.apache.drill.exec.record; import io.netty.buffer.DrillBuf; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.drill.common.StackTrace; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; @@ -38,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; /** @@ -85,7 +87,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp // the schema has changed since the previous call. // Set up to recognize previous fields that no longer exist. - final Map<String, ValueVector> oldFields = Maps.newHashMap(); + final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap(); for(final VectorWrapper<?> wrapper : container) { final ValueVector vector = wrapper.getValueVector(); oldFields.put(vector.getField().getName(), vector); @@ -109,6 +111,17 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp vector.clear(); schemaChanged = true; vector = TypeHelper.getNewVector(fieldDef, allocator); + + // If the field is a map, check if the map schema changed. + + } else if (vector.getField().getType().getMinorType() == MinorType.MAP && + ! isSameSchema(vector.getField().getChildren(), field.getChildList())) { + + // The map schema changed. Discard the old map and create a new one. + + schemaChanged = true; + vector.clear(); + vector = TypeHelper.getNewVector(fieldDef, allocator); } // Load the vector. @@ -121,7 +134,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp newVectors.add(vector); } - // rebuild the schema. final SchemaBuilder builder = BatchSchema.newBuilder(); for (final VectorWrapper<?> v : newVectors) { @@ -150,6 +162,51 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp return schemaChanged; } + /** + * Check if two schemas are the same. The schemas, given as lists, represent the + * children of the original and new maps (AKA structures.) + * + * @param currentChildren current children of a Drill map + * @param newChildren new children, in an incoming batch, of the same + * Drill map + * @return true if the schemas are identical, false if a child is missing + * or has changed type or cardinality (AKA "mode"). + */ + + private boolean isSameSchema(Collection<MaterializedField> currentChildren, + List<SerializedField> newChildren) { + if (currentChildren.size() != newChildren.size()) { + return false; + } + + // Column order can permute (see DRILL-5828). So, use a map + // for matching. + + Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap(); + for (MaterializedField currentChild : currentChildren) { + childMap.put(currentChild.getName(), currentChild); + } + for (SerializedField newChild : newChildren) { + MaterializedField currentChild = childMap.get(newChild.getNamePart().getName()); + + // New map member? + + if (currentChild == null) { + return false; + } + + // Changed data type? + + if (! currentChild.getType().equals(newChild.getMajorType())) { + return false; + } + } + + // Everything matches. + + return true; + } + @Override public TypedFieldId getValueVectorId(SchemaPath path) { return container.getValueVectorId(path); http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 63cac7d..abcb846 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -134,7 +134,7 @@ public class VectorContainer implements VectorAccessible { return addOrGet(field, null); } - @SuppressWarnings({ "resource", "unchecked" }) + @SuppressWarnings("unchecked") public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) { final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName())); final ValueVector vector; http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java index 4de4c2a..018653b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.util; import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.util.DrillStringUtils; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorAccessible; @@ -42,7 +43,7 @@ public class VectorUtil { System.out.println(rows + " row(s):"); List<String> columns = Lists.newArrayList(); for (VectorWrapper<?> vw : va) { - columns.add(vw.getValueVector().getField().getName()); + columns.add(formatFieldSchema(vw.getValueVector().getField())); } int width = columns.size(); @@ -80,6 +81,14 @@ public class VectorUtil { } } + public static String formatFieldSchema(MaterializedField field) { + String colName = field.getName() + "<" + field.getType().getMinorType() + "(" + field.getType().getMode() + ")" + ">"; + if (field.getType().getMinorType() == MinorType.MAP) { + colName += expandMapSchema(field); + } + return colName; + } + public static void appendVectorAccessibleContent(VectorAccessible va, StringBuilder formattedResults, final String delimiter, boolean includeHeader) { if (includeHeader) { @@ -135,9 +144,8 @@ public class VectorUtil { int columnWidth = getColumnWidth(columnWidths, columnIndex); width += columnWidth + 2; formats.add("| %-" + columnWidth + "s"); - MaterializedField field = vw.getValueVector().getField(); - columns.add(field.getName() + "<" + field.getType().getMinorType() + "(" + field.getType().getMode() + ")" + ">"); columnIndex++; + columns.add(formatFieldSchema(vw.getValueVector().getField())); } int rows = va.getRecordCount(); @@ -180,6 +188,19 @@ public class VectorUtil { } } + private static String expandMapSchema(MaterializedField mapField) { + StringBuilder buf = new StringBuilder(); + buf.append("{"); + String sep = ""; + for (MaterializedField field : mapField.getChildren()) { + buf.append(sep); + sep = ","; + buf.append(formatFieldSchema(field)); + } + buf.append("}"); + return buf.toString(); + } + public static void allocateVectors(Iterable<ValueVector> valueVectors, int count) { for (final ValueVector v : valueVectors) { AllocationHelper.allocateNew(v, count); http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java index f4c5536..73e8de6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java @@ -183,6 +183,9 @@ public class DistributedQueryQueue implements QueryQueue { */ public boolean isSameAs(ConfigSet otherSet) { + if (otherSet == null) { + return false; + } return queueThreshold == otherSet.queueThreshold && queueTimeout == otherSet.queueTimeout && largeQueueSize == otherSet.largeQueueSize && http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java index 4febe1d..fed2914 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java @@ -18,69 +18,72 @@ package org.apache.drill.exec.record.vector; import static org.junit.Assert.assertEquals; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.List; import org.apache.drill.categories.VectorTest; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecTest; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.test.rowSet.SchemaBuilder; import org.junit.Test; - -import com.google.common.collect.Lists; import org.junit.experimental.categories.Category; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + @Category(VectorTest.class) public class TestLoad extends ExecTest { private final DrillConfig drillConfig = DrillConfig.create(); + @SuppressWarnings("resource") @Test public void testLoadValueVector() throws Exception { final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig); - final ValueVector fixedV = new IntVector(MaterializedField.create("ints", - Types.required(MinorType.INT)), allocator); - final ValueVector varlenV = new VarCharVector(MaterializedField.create( - "chars", Types.required(MinorType.VARCHAR)), allocator); - final ValueVector nullableVarlenV = new NullableVarCharVector(MaterializedField.create("chars", - Types.optional(MinorType.VARCHAR)), allocator); - - final List<ValueVector> vectors = Lists.newArrayList(fixedV, varlenV, nullableVarlenV); - for (final ValueVector v : vectors) { - AllocationHelper.allocate(v, 100, 50); - v.getMutator().generateTestData(100); - } + BatchSchema schema = new SchemaBuilder() + .add("ints", MinorType.INT) + .add("chars", MinorType.VARCHAR) + .addNullable("chars2", MinorType.VARCHAR) + .build(); - final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false); - final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator); - final ByteBuf[] byteBufs = writableBatch.getBuffers(); - int bytes = 0; - for (ByteBuf buf : byteBufs) { - bytes += buf.writerIndex(); - } - final DrillBuf byteBuf = allocator.buffer(bytes); - int index = 0; - for (ByteBuf buf : byteBufs) { - buf.readBytes(byteBuf, index, buf.writerIndex()); - index += buf.writerIndex(); - } - byteBuf.writerIndex(bytes); + // Create vectors + + final List<ValueVector> vectors = createVectors(allocator, schema, 100); + + // Writeable batch now owns vector buffers + final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false); + + // Serialize the vectors + + final DrillBuf byteBuf = serializeBatch(allocator, writableBatch); + + // Batch loader does NOT take ownership of the serialized buffer + + final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator); batchLoader.load(writableBatch.getDef(), byteBuf); + + // Release the serialized buffer. + + byteBuf.release(); + + // TODO: Replace this with actual validation, not just dumping to the console. + boolean firstColumn = true; int recordCount = 0; for (final VectorWrapper<?> v : batchLoader) { @@ -122,7 +125,294 @@ public class TestLoad extends ExecTest { } } assertEquals(100, recordCount); + + // Free the original vectors + + writableBatch.clear(); + + // Free the deserialized vectors + batchLoader.clear(); + + // The allocator will verify that the frees were done correctly. + + allocator.close(); + } + + // TODO: Replace this low-level code with RowSet usage once + // DRILL-5657 is committed to master. + + private static List<ValueVector> createVectors(BufferAllocator allocator, BatchSchema schema, int i) { + final List<ValueVector> vectors = new ArrayList<>(); + for (MaterializedField field : schema) { + @SuppressWarnings("resource") + ValueVector v = TypeHelper.getNewVector(field, allocator); + AllocationHelper.allocate(v, 100, 50); + v.getMutator().generateTestData(100); + vectors.add(v); + } + return vectors; + } + + private static DrillBuf serializeBatch(BufferAllocator allocator, WritableBatch writableBatch) { + final ByteBuf[] byteBufs = writableBatch.getBuffers(); + int bytes = 0; + for (ByteBuf buf : byteBufs) { + bytes += buf.writerIndex(); + } + final DrillBuf byteBuf = allocator.buffer(bytes); + int index = 0; + for (ByteBuf buf : byteBufs) { + buf.readBytes(byteBuf, index, buf.writerIndex()); + index += buf.writerIndex(); + } + byteBuf.writerIndex(bytes); + return byteBuf; + } + + /** + * Test function to simulate loading a batch. + * + * @param allocator a memory allocator + * @param batchLoader the batch loader under test + * @param schema the schema of the new batch + * @return false if the same schema, true if schema changed; + * that is, whether the schema changed + * @throws SchemaChangeException should not occur + */ + + @SuppressWarnings("resource") + private boolean loadBatch(BufferAllocator allocator, + final RecordBatchLoader batchLoader, + BatchSchema schema) throws SchemaChangeException { + final List<ValueVector> vectors = createVectors(allocator, schema, 100); + final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false); + final DrillBuf byteBuf = serializeBatch(allocator, writableBatch); + boolean result = batchLoader.load(writableBatch.getDef(), byteBuf); + byteBuf.release(); writableBatch.clear(); + return result; + } + + @Test + public void testSchemaChange() throws SchemaChangeException { + final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig); + final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator); + + // Initial schema: a: INT, b: VARCHAR + // Schema change: N/A + + BatchSchema schema1 = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .build(); + { + assertTrue(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Same schema + // Schema change: No + + { + assertFalse(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Reverse columns: b: VARCHAR, a: INT + // Schema change: No + + { + BatchSchema schema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .add("a", MinorType.INT) + .build(); + assertFalse(loadBatch(allocator, batchLoader, schema)); + + // Potential bug: see DRILL-5828 + + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Drop a column: a: INT + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Add a column: a: INT, b: VARCHAR, c: INT + // Schema change: Yes + + { + assertTrue(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.INT) + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Change a column type: a: INT, b: VARCHAR, c: VARCHAR + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Empty schema + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + batchLoader.clear(); + allocator.close(); + } + + @Test + public void testMapSchemaChange() throws SchemaChangeException { + final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig); + final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator); + + // Initial schema: a: INT, m: MAP{} + + BatchSchema schema1 = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .buildMap() + .build(); + { + assertTrue(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Same schema + // Schema change: No + + { + assertFalse(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Add column to map: a: INT, m: MAP{b: VARCHAR} + // Schema change: Yes + + BatchSchema schema2 = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.VARCHAR) + .buildMap() + .build(); + { + assertTrue(loadBatch(allocator, batchLoader, schema2)); + assertTrue(schema2.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Same schema + // Schema change: No + + { + assertFalse(loadBatch(allocator, batchLoader, schema2)); + assertTrue(schema2.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Add column: a: INT, m: MAP{b: VARCHAR, c: INT} + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.VARCHAR) + .add("c", MinorType.INT) + .buildMap() + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Drop a column: a: INT, m: MAP{b: VARCHAR} + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.VARCHAR) + .buildMap() + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Change type: a: INT, m: MAP{b: INT} + // Schema change: Yes + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("m") + .add("b", MinorType.INT) + .buildMap() + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Empty map: a: INT, m: MAP{} + + { + assertTrue(loadBatch(allocator, batchLoader, schema1)); + assertTrue(schema1.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + // Drop map: a: INT + + { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .build(); + assertTrue(loadBatch(allocator, batchLoader, schema)); + assertTrue(schema.isEquivalent(batchLoader.getSchema())); + batchLoader.getContainer().zeroVectors(); + } + + batchLoader.clear(); + allocator.close(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 37fcdfd..e7bf61f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -345,6 +345,10 @@ public class QueryBuilder { } } + public QueryRowSetIterator rowSetIterator( ) { + return new QueryRowSetIterator(client.allocator(), withEventListener()); + } + /** * Run the query that is expected to return (at least) one row * with the only (or first) column returning a long value. http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java new file mode 100644 index 0000000..c329690 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.test; + +import java.util.Iterator; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; +import org.apache.drill.test.rowSet.DirectRowSet; + +public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<DirectRowSet> { + private final BufferingQueryEventListener listener; + private int recordCount = 0; + private int batchCount = 0; + QueryId queryId = null; + private BufferAllocator allocator; + private QueryDataBatch batch; + private QueryState state; + + QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) { + this.allocator = allocator; + this.listener = listener; + } + + public QueryId queryId() { return queryId; } + public String queryIdString() { return QueryIdHelper.getQueryId(queryId); } + public QueryState finalState() { return state; } + public int batchCount() { return batchCount; } + public int rowCount() { return recordCount; } + + @Override + public boolean hasNext() { + for ( ; ; ) { + QueryEvent event = listener.get(); + state = event.state; + batch = null; + switch (event.type) + { + case BATCH: + batchCount++; + recordCount += event.batch.getHeader().getRowCount(); + batch = event.batch; + return true; + case EOF: + state = event.state; + return false; + case ERROR: + throw new RuntimeException(event.error); + case QUERY_ID: + queryId = event.queryId; + break; + default: + throw new IllegalStateException("Unexpected event: " + event.type); + } + } + } + + @Override + public DirectRowSet next() { + + if (batch == null) { + throw new IllegalStateException(); + } + + // Unload the batch and convert to a row set. + + final RecordBatchLoader loader = new RecordBatchLoader(allocator); + try { + loader.load(batch.getHeader().getDef(), batch.getData()); + batch.release(); + batch = null; + VectorContainer container = loader.getContainer(); + container.setRecordCount(loader.getRecordCount()); + return new DirectRowSet(allocator, container); + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); + } + } + + public void printAll() { + for (DirectRowSet rowSet : this) { + rowSet.print(); + rowSet.clear(); + } + } + + @Override + public Iterator<DirectRowSet> iterator() { + return this; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java ---------------------------------------------------------------------- diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java index a9feafd..5029c56 100644 --- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -52,7 +52,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { private final int offset; private final BufferLedger ledger; private final BufferManager bufManager; - private final ByteBufAllocator alloc; +// private final ByteBufAllocator alloc; private final boolean isEmpty; private volatile int length; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? @@ -72,7 +72,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { this.udle = byteBuf; this.isEmpty = isEmpty; this.bufManager = manager; - this.alloc = alloc; +// this.alloc = alloc; this.addr = byteBuf.memoryAddress() + offset; this.ledger = ledger; this.length = length; http://git-wip-us.apache.org/repos/asf/drill/blob/42f7af22/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java ---------------------------------------------------------------------- diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java index e2b44a7..4d29d55 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -25,6 +25,7 @@ import java.util.Objects; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.expr.BasicTypeHelper; import org.apache.drill.exec.proto.UserBitShared.NamePart; import org.apache.drill.exec.proto.UserBitShared.SerializedField; @@ -86,6 +87,7 @@ public class MaterializedField { children.add(field); } + @Override public MaterializedField clone() { return withPathAndType(name, getType()); } @@ -168,6 +170,57 @@ public class MaterializedField { Objects.equals(this.type, other.type); } + public boolean isEquivalent(MaterializedField other) { + if (! name.equalsIgnoreCase(other.name)) { + return false; + } + + // Requires full type equality, including fields such as precision and scale. + // But, unset fields are equivalent to 0. Can't use the protobuf-provided + // isEquals(), that treats set and unset fields as different. + + if (type.getMinorType() != other.type.getMinorType()) { + return false; + } + if (type.getMode() != other.type.getMode()) { + return false; + } + if (type.getScale() != other.type.getScale()) { + return false; + } + if (type.getPrecision() != other.type.getPrecision()) { + return false; + } + + // Compare children -- but only for maps, not the internal children + // for Varchar, repeated or nullable types. + + if (type.getMinorType() != MinorType.MAP) { + return true; + } + + if (children == null || other.children == null) { + return children == other.children; + } + if (children.size() != other.children.size()) { + return false; + } + + // Maps are name-based, not position. But, for our + // purposes, we insist on identical ordering. + + Iterator<MaterializedField> thisIter = children.iterator(); + Iterator<MaterializedField> otherIter = other.children.iterator(); + while (thisIter.hasNext()) { + MaterializedField thisChild = thisIter.next(); + MaterializedField otherChild = otherIter.next(); + if (! thisChild.isEquivalent(otherChild)) { + return false; + } + } + return true; + } + /** * <p>Creates materialized field string representation. * Includes field name, its type with precision and scale if any and data mode.