Repository: drill Updated Branches: refs/heads/master 030189f90 -> a03f5429e
DRILL-5839: Handle Empty Batches in Merge Receiver closes #974 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a03f5429 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a03f5429 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a03f5429 Branch: refs/heads/master Commit: a03f5429e368cf73286eec6101871f6e61a5b7d1 Parents: 030189f Author: Padma Penumarthy <ppenuma...@yahoo.com> Authored: Sat Sep 30 17:33:17 2017 -0700 Committer: Paul Rogers <prog...@maprtech.com> Committed: Thu Oct 5 23:25:40 2017 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 102 +++++++++++++++++-- .../drill/exec/store/mock/MockRecordReader.java | 12 +++ .../impl/mergereceiver/TestMergingReceiver.java | 79 ++++++++++++++ .../mergerecv/empty_batch_noschema.json | 51 ++++++++++ .../multiple_providers_empty_batches.json | 67 ++++++++++++ 5 files changed, 300 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index ff3ac91..35c1f81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import java.util.ArrayList; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -45,6 +46,7 @@ import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared; @@ -188,6 +190,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> return IterOutcome.NONE; } + List<UserBitShared.SerializedField> fieldList = null; + boolean createDummyBatch = false; + // lazy initialization if (!hasRun) { schemaChanged = true; // first iteration is always a schema change @@ -214,14 +219,23 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> return IterOutcome.STOP; } - assert rawBatch != null : "rawBatch is null although context.shouldContinue() == true"; + // If rawBatch is null, go ahead and add it to the list. We will create dummy batches + // for all null batches later. + if (rawBatch == null) { + createDummyBatch = true; + rawBatches.add(rawBatch); + continue; + } + + if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { + // save the schema to fix up empty batches with no schema if needed. + fieldList = rawBatch.getHeader().getDef().getFieldList(); + } + if (rawBatch.getHeader().getDef().getRecordCount() != 0) { rawBatches.add(rawBatch); } else { - // save an empty batch to use for schema purposes. ignore batch if it contains no fields, and thus no schema - if (emptyBatch == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { - emptyBatch = rawBatch; - } + // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null try { while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { // Do nothing @@ -235,15 +249,48 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> clearBatches(rawBatches); return IterOutcome.STOP; } - if (rawBatch != null) { - rawBatches.add(rawBatch); - } else { - rawBatches.add(emptyBatch); + if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { + createDummyBatch = true; } + // Even if rawBatch is null, go ahead and add it to the list. + // We will create dummy batches for all null batches later. + rawBatches.add(rawBatch); } p++; } + // If no batch arrived with schema from any of the providers, just return NONE. + if (fieldList == null) { + return IterOutcome.NONE; + } + + // Go through and fix schema for empty batches. + if (createDummyBatch) { + // Create dummy record batch definition with 0 record count + UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder() + // we cannot use/modify the original field list as that is used by + // valid record batch. + // create a copy of field list with valuecount = 0 for all fields. + // This is for dummy schema generation. + .addAllField(createDummyFieldList(fieldList)) + .setRecordCount(0) + .build(); + + // Create dummy header + BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder() + .setIsLastBatch(true) + .setDef(dummyDef) + .build(); + + for (int i = 0; i < p; i++) { + RawFragmentBatch rawBatch = rawBatches.get(i); + if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { + rawBatch = new RawFragmentBatch(dummyHeader, null, null); + rawBatches.set(i, rawBatch); + } + } + } + // allocate the incoming record batch loaders senderCount = rawBatches.size(); incomingBatches = new RawFragmentBatch[senderCount]; @@ -435,6 +482,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> if (prevBatchWasFull) { break; } + } // set the value counts in the outgoing vectors @@ -454,6 +502,39 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } + // Create dummy field that will be used for empty batches. + private UserBitShared.SerializedField createDummyField(UserBitShared.SerializedField field) { + UserBitShared.SerializedField.Builder newDummyFieldBuilder = UserBitShared.SerializedField.newBuilder() + .setVarByteLength(0) + .setBufferLength(0) + .setValueCount(0) + .setNamePart(field.getNamePart()) + .setMajorType(field.getMajorType()); + + int index = 0; + for (UserBitShared.SerializedField childField : field.getChildList()) { + // make sure we make a copy of all children, so we do not corrupt the + // original fieldList. This will recursively call itself. + newDummyFieldBuilder.addChild(index, createDummyField(childField)); + index++; + } + + UserBitShared.SerializedField newDummyField = newDummyFieldBuilder.build(); + + return newDummyField; + } + + // Create a dummy field list that we can use for empty batches. + private List<UserBitShared.SerializedField> createDummyFieldList(List<UserBitShared.SerializedField> fieldList) { + List<UserBitShared.SerializedField> dummyFieldList = new ArrayList<UserBitShared.SerializedField>(); + + for (UserBitShared.SerializedField field : fieldList) { + dummyFieldList.add(createDummyField(field)); + } + + return dummyFieldList; + } + @Override public FragmentContext getContext() { return context; @@ -482,7 +563,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } else { state = BatchState.DONE; } - break; } if (batch.getHeader().getDef().getFieldCount() == 0) { @@ -640,7 +720,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); + // cg.saveCodeForDebugging(true); final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot(); ExpandableHyperContainer batch = null; http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index 2d9973e..108a621 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -51,6 +51,10 @@ public class MockRecordReader extends AbstractRecordReader { } private int getEstimatedRecordSize(MockColumn[] types) { + if (types == null) { + return 0; + } + int x = 0; for (int i = 0; i < types.length; i++) { x += TypeHelper.getSize(types[i].getMajorType()); @@ -68,6 +72,9 @@ public class MockRecordReader extends AbstractRecordReader { public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { try { final int estimateRowSize = getEstimatedRecordSize(config.getTypes()); + if (config.getTypes() == null) { + return; + } valueVectors = new ValueVector[config.getTypes().length]; batchRecordCount = 250000 / estimateRowSize; @@ -90,6 +97,11 @@ public class MockRecordReader extends AbstractRecordReader { final int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead); recordsRead += recordSetSize; + + if (valueVectors == null) { + return recordSetSize; + } + for (final ValueVector v : valueVectors) { final ValueVector.Mutator m = v.getMutator(); m.generateTestData(recordSetSize); http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java index cb2995c..f0aa1b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java @@ -152,4 +152,83 @@ public class TestMergingReceiver extends PopUnitTestBase { assertEquals(100000, count); } } + + @Test + public void handleEmptyBatchNoSchema() throws Exception { + @SuppressWarnings("resource") + final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + bit2.run(); + client.connect(); + final List<QueryDataBatch> results = + client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/mergerecv/empty_batch_noschema.json"), + Charsets.UTF_8)); + int count = 0; + final RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); + // print the results + for (final QueryDataBatch b : results) { + final QueryData queryData = b.getHeader(); + batchLoader.load(queryData.getDef(), b.getData()); // loaded but not used, for testing + count += queryData.getRowCount(); + b.release(); + batchLoader.clear(); + } + assertEquals(100000, count); + } + } + + @Test + public void testMultipleProvidersEmptyBatches() throws Exception { + @SuppressWarnings("resource") + final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try (final Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + final Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + final DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + bit1.run(); + bit2.run(); + client.connect(); + final List<QueryDataBatch> results = + client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/mergerecv/multiple_providers_empty_batches.json"), + Charsets.UTF_8)); + int count = 0; + final RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); + // print the results + Long lastBlueValue = null; + for (final QueryDataBatch b : results) { + final QueryData queryData = b.getHeader(); + final int batchRowCount = queryData.getRowCount(); + count += batchRowCount; + batchLoader.load(queryData.getDef(), b.getData()); + for (final VectorWrapper<?> vw : batchLoader) { + @SuppressWarnings("resource") + final ValueVector vv = vw.getValueVector(); + final ValueVector.Accessor va = vv.getAccessor(); + final MaterializedField materializedField = vv.getField(); + final int numValues = va.getValueCount(); + for (int valueIdx = 0; valueIdx < numValues; ++valueIdx) { + if (materializedField.getName().equals("blue")) { + final long longValue = (Long) va.getObject(valueIdx); + // check that order is ascending + if (lastBlueValue != null) { + assertTrue(longValue >= lastBlueValue); + } + lastBlueValue = longValue; + } + } + } + b.release(); + batchLoader.clear(); + } + assertEquals(300000, count); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json b/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json new file mode 100644 index 0000000..6bf7b24 --- /dev/null +++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch_noschema.json @@ -0,0 +1,51 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 100000, types: [ + {name: "blue", type: "BIGINT", mode: "OPTIONAL"}, + {name: "red", type: "BIGINT", mode: "OPTIONAL"}, + {name: "green", type: "BIGINT", mode: "OPTIONAL"} + ]}, + {records: 0}, + {records: 0, types: [ + {name: "blue", type: "BIGINT", mode: "OPTIONAL"}, + {name: "red", type: "BIGINT", mode: "OPTIONAL"}, + {name: "green", type: "BIGINT", mode: "OPTIONAL"} + ]} + ] + }, + { + @id: 2, + child: 1, + pop: "external-sort", + orderings: [ {expr: "blue", order:"DESC"}, {expr: "red", order:"DESC"} ] + }, + { + @id: 3, + child: 2, + pop:"selection-vector-remover" + }, + { + @id: 4, + child: 3, + pop: "single-merge-exchange", + orderings: [ {expr: "blue", order:"DESC"}, {expr: "red", order:"DESC"} ] + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/a03f5429/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json b/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json new file mode 100644 index 0000000..5d2feb8 --- /dev/null +++ b/exec/java-exec/src/test/resources/mergerecv/multiple_providers_empty_batches.json @@ -0,0 +1,67 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + + entries:[ + {records: 100000, types: [ + {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED", + properties: { a: 10, b: "foo" }} + ]}, + {records: 100000, types: [ + {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED", + properties: { a: 10, b: "foo" }} + ]}, + {records: 0}, + {records: 0, types: [ + {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED", + properties: { a: 10, b: "foo" }} + ]}, + {records: 100000, types: [ + {name: "blue", type: "BIGINT", mode: "REQUIRED", repeat: 2}, + {name: "red", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED", + properties: { a: 10, b: "foo" }} + ]}, + {records: 0} + ] + }, + { + @id: 2, + child: 1, + pop: "external-sort", + orderings: [ {expr: "blue", order:"ASC"}, {expr: "red", order:"ASC"} ] + }, + { + @id: 3, + child: 2, + pop:"selection-vector-remover" + }, + { + @id: 4, + child: 3, + pop: "single-merge-exchange", + orderings: [ {expr: "blue", order:"ASC"}, {expr: "red", order:"ASC"} ] + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file