DRILL-5514: Enhance VectorContainer to merge two row sets Adds ability to merge two schemas and to merge two vector containers, in each case producing a new, merged result. See DRILL-5514 for details.
Also provides a handy constructor to create a vector container given a pre-defined schema. closes #837 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be43a9ed Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be43a9ed Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be43a9ed Branch: refs/heads/master Commit: be43a9edd148ef3af6f92c5ce7cda235c5ac1ad6 Parents: b714b2d Author: Paul Rogers <prog...@maprtech.com> Authored: Mon May 15 15:59:35 2017 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Jun 19 12:22:10 2017 +0300 ---------------------------------------------------------------------- .../apache/drill/exec/record/BatchSchema.java | 28 +++++ .../drill/exec/record/VectorContainer.java | 54 +++++++- .../drill/exec/record/TestVectorContainer.java | 126 +++++++++++++++++++ .../apache/drill/test/rowSet/DirectRowSet.java | 5 + .../drill/test/rowSet/HyperRowSetImpl.java | 5 + .../drill/test/rowSet/IndirectRowSet.java | 5 + .../org/apache/drill/test/rowSet/RowSet.java | 2 + 7 files changed, 221 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index e9dcd28..63dcdb45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -157,4 +158,31 @@ public class BatchSchema implements Iterable<MaterializedField> { return true; } + /** + * Merge two schema to produce a new, merged schema. The caller is responsible + * for ensuring that column names are unique. The order of the fields in the + * new schema is the same as that of this schema, with the other schema's fields + * appended in the order defined in the other schema. + * <p> + * Merging data with selection vectors is unlikely to be useful, or work well. + * With a selection vector, the two record batches would have to be correlated + * both in their selection vectors AND in the underlying vectors. Such a use case + * is hard to imagine. So, for now, this method forbids merging schemas if either + * of them carry a selection vector. If we discover a meaningful use case, we can + * revisit the issue. + * @param otherSchema the schema to merge with this one + * @return the new, merged, schema + */ + + public BatchSchema merge(BatchSchema otherSchema) { + if (selectionVectorMode != SelectionVectorMode.NONE || + otherSchema.selectionVectorMode != SelectionVectorMode.NONE) { + throw new IllegalArgumentException("Cannot merge schemas with selection vectors"); + } + List<MaterializedField> mergedFields = + new ArrayList<>(fields.size() + otherSchema.fields.size()); + mergedFields.addAll(this.fields); + mergedFields.addAll(otherSchema.fields); + return new BatchSchema(selectionVectorMode, mergedFields); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 69e04ac..54a04bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -60,6 +60,28 @@ public class VectorContainer implements VectorAccessible { this.allocator = allocator; } + /** + * Create a new vector container given a pre-defined schema. Creates the + * corresponding vectors, but does not allocate memory for them. Call + * {@link #allocateNew()} or {@link #allocateNewSafe()} to allocate + * memory. + * <p> + * Note that this method does the equivalent of {@link #buildSchema(SelectionVectorMode)} + * using the schema provided. + * + * @param allocator allocator to be used to allocate memory later + * @param schema the schema that defines the vectors to create + */ + + public VectorContainer(BufferAllocator allocator, BatchSchema schema) { + this.allocator = allocator; + for (MaterializedField field : schema) { + addOrGet(field, null); + } + this.schema = schema; + schemaChanged = false; + } + @Override public String toString() { return super.toString() @@ -304,7 +326,6 @@ public class VectorContainer implements VectorAccessible { } return va.getChildWrapper(fieldIds); - } private VectorWrapper<?> getValueAccessorById(int... fieldIds) { @@ -375,9 +396,7 @@ public class VectorContainer implements VectorAccessible { * Clears the contained vectors. (See {@link ValueVector#clear}). */ public void zeroVectors() { - for (VectorWrapper<?> w : wrappers) { - w.clear(); - } + VectorAccessibleUtilities.clear(this); } public int getNumberOfColumns() { @@ -398,4 +417,31 @@ public class VectorContainer implements VectorAccessible { } return true; } + + /** + * Merge two batches to create a single, combined, batch. Vectors + * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}. + * The two batches must have identical row counts. The pattern is that + * this container is the main part of the record batch, the other + * represents new columns to merge. + * <p> + * Reference counts on the underlying buffers are <b>unchanged</b>. + * The client code is assumed to abandon the two input containers in + * favor of the merged container. + * + * @param otherContainer the container to merge with this one + * @return a new, merged, container + */ + public VectorContainer merge(VectorContainer otherContainer) { + if (recordCount != otherContainer.recordCount) { + throw new IllegalArgumentException(); + } + VectorContainer merged = new VectorContainer(allocator); + merged.schema = schema.merge(otherContainer.schema); + merged.recordCount = recordCount; + merged.wrappers.addAll(wrappers); + merged.wrappers.addAll(otherContainer.wrappers); + merged.schemaChanged = false; + return merged; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java new file mode 100644 index 0000000..d7a59bf --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestVectorContainer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import static org.junit.Assert.*; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.test.DrillTest; +import org.apache.drill.test.OperatorFixture; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestVectorContainer extends DrillTest { + + // TODO: Replace the following with an extension of SubOperatorTest class + // once that is available. + + protected static OperatorFixture fixture; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + fixture = OperatorFixture.standardFixture(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + fixture.close(); + } + + /** + * Test of the ability to merge two schemas and to merge + * two vector containers. The merge is "horizontal", like + * a row-by-row join. Since each container is a list of + * vectors, we just combine the two lists to create the + * merged result. + */ + @Test + public void testContainerMerge() { + + // Simulated data from a reader + + BatchSchema leftSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .build(); + SingleRowSet left = fixture.rowSetBuilder(leftSchema) + .add(10, "fred") + .add(20, "barney") + .add(30, "wilma") + .build(); + + // Simulated "implicit" coumns: row number and file name + + BatchSchema rightSchema = new SchemaBuilder() + .add("x", MinorType.SMALLINT) + .add("y", MinorType.VARCHAR) + .build(); + SingleRowSet right = fixture.rowSetBuilder(rightSchema) + .add(1, "foo.txt") + .add(2, "bar.txt") + .add(3, "dino.txt") + .build(); + + // The merge batch we expect to see + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .add("x", MinorType.SMALLINT) + .add("y", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .add(10, "fred", 1, "foo.txt") + .add(20, "barney", 2, "bar.txt") + .add(30, "wilma", 3, "dino.txt") + .build(); + + // Merge containers without selection vector + + RowSet merged = fixture.wrap( + left.container().merge(right.container())); + + RowSetComparison comparison = new RowSetComparison(expected); + comparison.verify(merged); + + // Merge containers via row set facade + + RowSet mergedRs = left.merge(right); + comparison.verifyAndClear(mergedRs); + + // Add a selection vector. Merging is forbidden, in the present code, + // for batches that have a selection vector. + + SingleRowSet leftIndirect = left.toIndirect(); + try { + leftIndirect.merge(right); + fail(); + } catch (IllegalArgumentException e) { + // Expected + } + leftIndirect.clear(); + right.clear(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java index 706db27..29a1702 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/DirectRowSet.java @@ -233,4 +233,9 @@ public class DirectRowSet extends AbstractSingleRowSet implements ExtendableRowS @Override public SelectionVector2 getSv2() { return null; } + + @Override + public RowSet merge(RowSet other) { + return new DirectRowSet(allocator, container().merge(other.container())); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java index c7cb1b2..afc2e6e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java @@ -292,4 +292,9 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet { @Override public int rowCount() { return sv4.getCount(); } + + @Override + public RowSet merge(RowSet other) { + return new HyperRowSetImpl(allocator, container().merge(other.container()), sv4); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index f90fbb7..17a0ac8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -122,4 +122,9 @@ public class IndirectRowSet extends AbstractSingleRowSet { RecordBatchSizer sizer = new RecordBatchSizer(container, sv2); return sizer.actualSize(); } + + @Override + public RowSet merge(RowSet other) { + return new IndirectRowSet(allocator, container().merge(other.container()), sv2); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/be43a9ed/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java index d22139c..b6bbd4f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSet.java @@ -162,6 +162,8 @@ public interface RowSet { int size(); + RowSet merge(RowSet other); + BatchSchema batchSchema(); /**