Reduce the likelihood of phantom schema changes in distributed query plans.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/98bc9e19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/98bc9e19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/98bc9e19 Branch: refs/heads/master Commit: 98bc9e19c153ac6f70ec58fbe37fcb2abc9de3f7 Parents: a8bfbf1 Author: Jacques Nadeau <[email protected]> Authored: Thu Sep 5 21:17:58 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Sep 5 22:39:55 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/physical/impl/ScanBatch.java | 2 +- .../drill/exec/physical/impl/aggregate/AggBatch.java | 2 +- .../apache/drill/exec/physical/impl/sort/SortBatch.java | 4 ++-- .../apache/drill/exec/record/AbstractRecordBatch.java | 2 +- .../drill/exec/record/AbstractSingleRecordBatch.java | 2 +- .../apache/drill/exec/record/HyperVectorWrapper.java | 4 ++-- .../apache/drill/exec/record/SimpleVectorWrapper.java | 2 +- .../org/apache/drill/exec/record/VectorContainer.java | 12 ++++++++---- .../org/apache/drill/exec/record/VectorWrapper.java | 2 +- .../drill/exec/record/selection/SelectionVector4.java | 2 ++ .../apache/drill/exec/vector/BaseDataValueVector.java | 12 +++++++++--- .../exec/store/parquet/ParquetRecordReaderTest.java | 2 +- 12 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index ae043ec..a02e5f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -156,7 +156,7 @@ public class ScanBatch implements RecordBatch { @Override public void removeAllFields() { for(VectorWrapper<?> vw : container){ - vw.release(); + vw.clear(); } container.clear(); fieldVectorMap.clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java index 97e66f9..806239b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java @@ -90,7 +90,7 @@ public class AggBatch extends AbstractRecordBatch<StreamingAggregate> { logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); switch(out){ case CLEANUP_AND_RETURN: - container.clear(); + container.zeroVectors(); done = true; return aggregator.getOutcome(); case RETURN_OUTCOME: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 09ae687..61bcf34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -75,7 +75,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { @Override protected void cleanup() { super.cleanup(); - container.clear(); + container.zeroVectors();; sv4.clear(); } @@ -98,7 +98,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { break outer; case NOT_YET: case STOP: - container.clear(); + container.zeroVectors(); return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 0d44368..aba023d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -42,7 +42,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements @Override public void kill() { - container.clear(); + container.zeroVectors(); killIncoming(); cleanup(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index fb2fc3a..63c31a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -29,7 +29,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte case NONE: case NOT_YET: case STOP: - container.clear(); + container.zeroVectors(); return upstream; case OK_NEW_SCHEMA: try{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 5bb6208..5d07849 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -50,14 +50,14 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< } @Override - public void release() { + public void clear() { if(!releasable) return; for(T x : vectors){ x.clear(); } } - + @Override @SuppressWarnings("unchecked") public VectorWrapper<T> cloneAndTransfer() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index 62ca8a4..7ff4b53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -47,7 +47,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper } @Override - public void release() { + public void clear() { v.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 7c1e0ad..14c3a8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -93,7 +93,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) { VectorWrapper<?> w = iter.next(); if (!w.isHyper() && v == w.getValueVector()) { - w.release(); + w.clear(); iter.remove(); return; } @@ -148,11 +148,15 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { public void clear() { // TODO: figure out a better approach for this. // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no - // data. + // data. // schema = null; + zeroVectors(); + wrappers.clear(); + } + + public void zeroVectors(){ for (VectorWrapper<?> w : wrappers) { - w.release(); + w.clear(); } - wrappers.clear(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 1c5308e..5188cc3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -10,6 +10,6 @@ public interface VectorWrapper<T extends ValueVector> { public T getValueVector(); public T[] getValueVectors(); public boolean isHyper(); - public void release(); + public void clear(); public VectorWrapper<T> cloneAndTransfer(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index ebfb9e4..4b6c2c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -92,6 +92,8 @@ public class SelectionVector4 { } public void clear(){ + start = 0; + length = 0; this.vector.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index f41dcd2..b82b14e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -32,14 +32,20 @@ public abstract class BaseDataValueVector extends BaseValueVector{ @Override public ByteBuf[] getBuffers(){ - ByteBuf[] out = new ByteBuf[]{data}; - data.readerIndex(0); - data.retain(); + ByteBuf[] out; + if(valueCount == 0){ + out = new ByteBuf[0]; + }else{ + out = new ByteBuf[]{data}; + data.readerIndex(0); + data.retain(); + } clear(); return out; } public int getBufferSize() { + if(valueCount == 0) return 0; return data.writerIndex(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index e66ec49..267848e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -209,7 +209,7 @@ public class ParquetRecordReaderTest { } for(VectorWrapper<?> vw : batchLoader){ - vw.release(); + vw.clear(); } result.release();
