DRILL-1643, DRILL-1665: Flatten fixes - Fix repeated map vector to correctly report value count - Update flatten so init variables are reset for each new batch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ed962497 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ed962497 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ed962497 Branch: refs/heads/master Commit: ed962497f04d432591b33b7532741b07ab46fbfe Parents: 761156b Author: Jason Altekruse <[email protected]> Authored: Wed Nov 5 18:11:22 2014 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Tue Nov 11 16:48:45 2014 -0800 ---------------------------------------------------------------------- .../impl/flatten/FlattenRecordBatch.java | 2 + .../physical/impl/flatten/FlattenTemplate.java | 40 +++-- .../exec/physical/impl/flatten/Flattener.java | 1 + .../exec/vector/complex/RepeatedMapVector.java | 9 +- .../exec/physical/impl/flatten/TestFlatten.java | 99 +++++++++++++ .../store/json/test_flatten_mappify2.json | 148 +++++++++++++++++++ 6 files changed, 286 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 129174e..66c6168 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -161,6 +161,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { this.recordCount = remainderIndex; } else { setValueCount(outputRecords); + flattener.resetGroupIndex(); for(VectorWrapper<?> v: incoming) { v.clear(); } @@ -194,6 +195,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { for (VectorWrapper<?> v : incoming) { v.clear(); } + flattener.resetGroupIndex(); this.recordCount = remainingRecordCount; } // In case of complex writer expression, vectors would be added to batch run-time. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index af4cead..c5d3d93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import com.google.common.collect.ImmutableList; + +import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor; import org.apache.drill.exec.vector.RepeatedVector; public abstract class FlattenTemplate implements Flattener { @@ -40,7 +42,9 @@ public abstract class FlattenTemplate implements Flattener { private SelectionVector4 vector4; private SelectionVectorMode svMode; RepeatedVector fieldToFlatten; + RepeatedAccessor accessor; private int groupIndex; + // this allows for groups to be written between batches if we run out of space, for cases where we have finished // a batch on the boundary it will be set to 0 private int childIndexWithinCurrGroup; @@ -56,6 +60,7 @@ public abstract class FlattenTemplate implements Flattener { @Override public void setFlattenField(RepeatedVector flattenField) { this.fieldToFlatten = flattenField; + this.accessor = flattenField.getAccessor(); } public RepeatedVector getFlattenField() { @@ -76,18 +81,26 @@ public abstract class FlattenTemplate implements Flattener { if (childIndexWithinCurrGroup == -1) { childIndexWithinCurrGroup = 0; } - outer: - for ( ; groupIndex < fieldToFlatten.getAccessor().getGroupCount(); groupIndex++) { - currGroupSize = fieldToFlatten.getAccessor().getGroupSizeAtIndex(groupIndex); - for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) { - if (!doEval(groupIndex, firstOutputIndex)) { - break outer; + outer: { + final int groupCount = accessor.getGroupCount(); + for ( ; groupIndex < groupCount; groupIndex++) { + currGroupSize = accessor.getGroupSizeAtIndex(groupIndex); + for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) { + if (!doEval(groupIndex, firstOutputIndex)) { + break outer; + } + firstOutputIndex++; + childIndex++; } - firstOutputIndex++; - childIndex++; + childIndexWithinCurrGroup = 0; } - childIndexWithinCurrGroup = 0; } +// System.out.println(String.format("startIndex %d, recordCount %d, firstOutputIndex: %d, currGroupSize: %d, childIndexWithinCurrGroup: %d, groupIndex: %d", startIndex, recordCount, firstOutputIndex, currGroupSize, childIndexWithinCurrGroup, groupIndex)); +// try{ +//// Thread.sleep(1000); +// }catch(Exception e){ +// +// } for (TransferPair t : transfers) { t.splitAndTransfer(startIndex, childIndex - startIndex); @@ -113,6 +126,15 @@ public abstract class FlattenTemplate implements Flattener { doSetup(context, incoming, outgoing); } + + + @Override + public void resetGroupIndex() { + this.groupIndex = 0; + this.currGroupSize = 0; + this.childIndex = 0; + } + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 49b9c1b..2141ca2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -32,6 +32,7 @@ public interface Flattener { public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex); public void setFlattenField(RepeatedVector repeatedColumn); public RepeatedVector getFlattenField(); + public void resetGroupIndex(); public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 99b9453..9b7011c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -420,7 +420,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat public void load(SerializedField metadata, DrillBuf buf) { List<SerializedField> fields = metadata.getChildList(); - int bufOffset = offsets.load(metadata.getValueCount()+1, buf); + int bufOffset = offsets.load(metadata.getGroupCount()+1, buf); for (SerializedField fmd : fields) { MaterializedField fieldDef = MaterializedField.create(fmd); @@ -444,7 +444,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat SerializedField.Builder b = getField() // .getAsBuilder() // .setBufferLength(getBufferSize()) // - .setValueCount(accessor.getValueCount()); + .setGroupCount(accessor.getGroupCount()); for (ValueVector v : vectors.values()) { b.addChild(v.getMetadata()); } @@ -489,7 +489,8 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat @Override public int getValueCount() { - return offsets.getAccessor().getValueCount() - 1; + return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1); +// return offsets.getAccessor().getValueCount() - 1; } public int getGroupSizeAtIndex(int index) { @@ -542,7 +543,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat @Override public int getGroupCount() { - return size(); + return offsets.getAccessor().getValueCount() - 1; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java index 9514517..d4c19a3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java @@ -22,21 +22,120 @@ import org.junit.Test; public class TestFlatten extends BaseTestQuery { + /** + * enable this if you have the following files: + * - /tmp/yelp_academic_dataset_business.json + * - /tmp/mapkv.json + * - /tmp/drill1665.json + */ + public static boolean RUN_ADVANCED_TESTS = false; + + + @Test + public void testFlattenFailure() throws Exception { + test("select flatten(complex), rownum from cp.`/store/json/test_flatten_mappify2.json`"); +// test("select complex, rownum from cp.`/store/json/test_flatten_mappify2.json`"); + } + @Test public void testKVGenFlatten1() throws Exception { + // works - TODO and verify results test("select flatten(kvgen(f1)) as monkey, x " + "from cp.`/store/json/test_flatten_mapify.json`"); } @Test public void testTwoFlattens() throws Exception { + // second re-write rule has been added to test the fixes together, this now runs test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`"); } @Test + public void testFlattenRepeatedMap() throws Exception { + test("select `integer`, `float`, x, flatten(z) from cp.`/jsoninput/input2.json`"); + } + + @Test + public void testFlattenKVGenFlatten() throws Exception { + // currently does not fail, but produces incorrect results, requires second re-write rule to split up expressions + // with complex outputs + test("select `integer`, `float`, x, flatten(kvgen(flatten(z))) from cp.`/jsoninput/input2.json`"); + } + + @Test + public void testKVGenFlatten2() throws Exception { + // currently runs + // TODO - re-verify results by hand + if(RUN_ADVANCED_TESTS){ + test("select flatten(kvgen(visited_cellid_counts)) as mytb from dfs.`/tmp/mapkv.json`") ; + } + } + + @Test public void testFilterFlattenedRecords() throws Exception { + // WORKS!! + // TODO - hand verify results test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " + "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1"); } + @Test + public void testFilterFlattenedRecords2() throws Exception { + // previously failed in generated code + // "value" is neither a method, a field, nor a member class of "org.apache.drill.exec.expr.holders.RepeatedVarCharHolder" [ 42eb1fa1-0742-4e4f-8723-609215c18900 on 10.250.0.86:31010 ] + // appears to be resolving the data coming out of flatten as repeated, check fast schema stuff + + // FIXED BY RETURNING PROPER SCHEMA DURING FAST SCHEMA STEP + // these types of problems are being solved more generally as we develp better support for chaning schema + if(RUN_ADVANCED_TESTS){ + test("select celltbl.catl from (\n" + + " select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100\n" + + " ) celltbl where celltbl.catl = 'Doctors'"); + } + } + + @Test + public void countAggFlattened() throws Exception { + if(RUN_ADVANCED_TESTS){ + test("select celltbl.catl, count(celltbl.catl) from ( " + + "select business_id, flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100 " + + ") celltbl group by celltbl.catl limit 10 "); + } + } + + + @Test + public void flattenAndAdditionalColumn() throws Exception { + if(RUN_ADVANCED_TESTS){ + test("select business_id, flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b"); + } + } + + @Test + public void testFailingFlattenAlone() throws Exception { + if(RUN_ADVANCED_TESTS){ + test("select flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b "); + } + } + + @Test + public void testDistinctAggrFlattened() throws Exception { + if(RUN_ADVANCED_TESTS){ + test(" select distinct(celltbl.catl) from (\n" + + " select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b\n" + + " ) celltbl"); + } + + } + + @Test + public void testDrill1665() throws Exception { + if(RUN_ADVANCED_TESTS){ + test("select id, flatten(evnts) as rpt from dfs.`/tmp/drill1665.json`"); + } + + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json new file mode 100644 index 0000000..f53a0c7 --- /dev/null +++ b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json @@ -0,0 +1,148 @@ +{ + "rownum": 1, + "bigintegercol": { + "int_1": 1, + "int_2": 2, + "int_3": 3 + }, + "varcharcol": { + "varchar_1": "abc", + "varchar_2": "def", + "varchar_3": "xyz" + }, + "boolcol": { + "boolean_1": true, + "boolean_2": false, + "boolean_3": true + }, + "float8col": { + "f8_1": 1.1, + "f8_2": 2.2 + }, + "complex": [ + { + "col1": 3 + }, + { + "col2": 2, + "col3": 1 + }, + { + "col1": 7 + } + ] +} +{ + "rownum": 2, + "bigintegercol": { + "int_1": 1, + "int_2": 2 + }, + "varcharcol": { + "varchar_1": "abcd" + }, + "boolcol": { + "boolean_1": true + }, + "float8col": { + "f8_1": 1.1, + "f8_2": 2.2, + "f8_3": 3.3 + }, + "complex": [ + { + "col2": 2, + "col3": 1 + }, + { + "col1": 7 + } + ] +} +{ + "rownum": 3, + "bigintegercol": { + "int_1": 1, + "int_3": 3 + }, + "varcharcol": { + "varchar_1": "abcde", + "varchar_2": null, + "varchar_3": "xyz", + "varchar_4": "xyz2" + }, + "boolcol": { + "boolean_1": true, + "boolean_2": false + }, + "float8col": { + "f8_1": 1.1, + "f8_3": 6.6 + }, + "complex": [ + { + "col1": 2, + "col3": 1 + } + ] +} +{ + "rownum": 4, + "bigintegercol": { + "int_2": 2, + "int_3": 3 + }, + "varcharcol": { + "varchar_1": "abc", + "varchar_2": "def" + }, + "boolcol": { + "boolean_1": true, + "boolean_2": false, + "boolean_3": null + }, + "float8col": { + "f8_1": 1.1, + "f8_2": 2.2 + }, + "complex": [ + { + "col1": 3, + "col2": 2 + }, + { + "col3": 1, + "col1": 7 + } + ] +} +{ + "rownum": 5, + "bigintegercol": { + "int_2": 2, + "int_3": 3 + }, + "varcharcol": { + "varchar_1": "abc", + "varchar_2": "def" + }, + "boolcol": { + "boolean_1": true, + "boolean_2": false, + "boolean_3": null + }, + "float8col": { + "f8_1": 1.1, + "f8_2": 2.2 + }, + "complex": [ + { + "col1": 3, + "col2": 2 + }, + { + "col3": 1, + "col1": 7 + } + ] +}
