DRILL-824: MergingRecordBatch.next() fails to reallocate the outgoing vectors if copyFromSafe returns false.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ef28054b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ef28054b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ef28054b Branch: refs/heads/master Commit: ef28054b4f8eb9f9b4fbaa069089e18536394632 Parents: e6a6652 Author: vkorukanti <[email protected]> Authored: Fri May 23 09:53:38 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 28 09:13:58 2014 -0700 ---------------------------------------------------------------------- .../impl/mergereceiver/MergingRecordBatch.java | 4 +- .../drill/exec/TestQueriesOnLargeFile.java | 118 +++++++++++++++++++ .../complex/writer/TestJsonReaderLargeFile.java | 101 ---------------- .../largefiles/merging_receiver_large_data.json | 113 ++++++++++++++++++ 4 files changed, 234 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index cc38cbe..e3f466a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -253,13 +253,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // pop next value from pq and copy to outgoing batch Node node = pqueue.peek(); if (!copyRecordToOutgoingBatch(node)) { + logger.debug("Outgoing vectors space is full; breaking"); + prevBatchWasFull = true; break; } pqueue.poll(); if (isOutgoingFull()) { // set a flag so that we reallocate on the next iteration - logger.debug("Outgoing vectors are full; breaking"); + logger.debug("Outgoing vectors record batch size reached; breaking"); prevBatchWasFull = true; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java new file mode 100644 index 0000000..879dc3c --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.vector.BigIntVector; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.PrintWriter; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public class TestQueriesOnLargeFile extends BaseTestQuery { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestQueriesOnLargeFile.class); + + private static File dataFile = null; + private static int NUM_RECORDS = 15000; + + @BeforeClass + public static void generateTestData() throws Exception { + // Generate a json file with NUM_RECORDS number of records + while (true) { + dataFile = File.createTempFile("drill-json", ".json"); + if (dataFile.exists()) { + boolean success = dataFile.delete(); + if (success) { + break; + } + } + logger.trace("retry creating tmp file"); + } + + PrintWriter printWriter = new PrintWriter(dataFile); + + for (int i=1; i<=NUM_RECORDS; i++) { + printWriter.println("{"); + printWriter.println(" \"id\" : " + Math.random() + ","); + printWriter.println(" \"summary\" : \"Apache Drill provides low latency ad-hoc queries to many different data sources, "+ + "including nested data. Inspired by Google's Dremel, Drill is designed to scale to 10,000 servers and " + + "query petabytes of data in seconds.\""); + printWriter.println("}"); + } + + printWriter.close(); + } + + @Test + public void testRead() throws Exception { + List<QueryResultBatch> results = testSqlWithResults( + String.format("SELECT count(*) FROM dfs.`default`.`%s`", dataFile.getPath())); + + RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); + + for(QueryResultBatch batch : results) { + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + + if (batchLoader.getRecordCount() <= 0) { + continue; + } + + BigIntVector countV = (BigIntVector) batchLoader.getValueAccessorById(BigIntVector.class, 0).getValueVector(); + assertTrue("Total of "+ NUM_RECORDS + " records expected in count", countV.getAccessor().get(0) == NUM_RECORDS); + + batchLoader.clear(); + batch.release(); + } + } + + @Test + public void testMergingReceiver() throws Exception { + String plan = Files.toString(FileUtils.getResourceAsFile("/largefiles/merging_receiver_large_data.json"), + Charsets.UTF_8).replace("#{TEST_FILE}", dataFile.getPath()); + List<QueryResultBatch> results = testPhysicalWithResults(plan); + + int recordsInOutput = 0; + for(QueryResultBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", + NUM_RECORDS, recordsInOutput), NUM_RECORDS == recordsInOutput); + } + + @AfterClass + public static void deleteTestData() throws Exception { + if (dataFile != null) { + if (dataFile.exists()) { + org.apache.commons.io.FileUtils.forceDelete(dataFile); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java deleted file mode 100644 index 643100a..0000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.vector.complex.writer; - -import org.apache.commons.io.FileUtils; -import org.apache.drill.BaseTestQuery; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.rpc.user.QueryResultBatch; -import org.apache.drill.exec.vector.BigIntVector; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.PrintWriter; -import java.util.List; - -import static org.junit.Assert.assertTrue; - -public class TestJsonReaderLargeFile extends BaseTestQuery { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReaderLargeFile.class); - - private static File dataFile = null; - private static int NUM_RECORDS = 15000; - - @BeforeClass - public static void generateTestData() throws Exception { - // Generate a json file with NUM_RECORDS number of records - while (true) { - dataFile = File.createTempFile("drill-json", ".json"); - if (dataFile.exists()) { - boolean success = dataFile.delete(); - if (success) { - break; - } - } - logger.trace("retry creating tmp file"); - } - - PrintWriter printWriter = new PrintWriter(dataFile); - String record = "{\n" + - "\"project\" : \"Drill\", \n" + - "\"summary\" : \"Apache Drill provides low latency ad-hoc queries to many different data sources, " + - "including nested data. Inspired by Google's Dremel, Drill is designed to scale to 10,000 servers and " + - "query petabytes of data in seconds.\"\n" + - "}"; - - for (int i=1; i<=NUM_RECORDS; i++) { - printWriter.println(record); - } - - printWriter.close(); - } - - @Test - public void testRead() throws Exception { - List<QueryResultBatch> results = testSqlWithResults( - String.format("SELECT count(*) FROM dfs.`default`.`%s`", dataFile.getPath())); - - RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); - - for(QueryResultBatch batch : results) { - batchLoader.load(batch.getHeader().getDef(), batch.getData()); - - if (batchLoader.getRecordCount() <= 0) { - continue; - } - - BigIntVector countV = (BigIntVector) batchLoader.getValueAccessorById(BigIntVector.class, 0).getValueVector(); - assertTrue("Total of "+ NUM_RECORDS + " records expected in count", countV.getAccessor().get(0) == NUM_RECORDS); - - batchLoader.clear(); - batch.release(); - } - } - - @AfterClass - public static void deleteTestData() throws Exception { - if (dataFile != null) { - if (dataFile.exists()) { - FileUtils.forceDelete(dataFile); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json new file mode 100644 index 0000000..3ce5526 --- /dev/null +++ b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json @@ -0,0 +1,113 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + "graph" : [ { + "pop" : "fs-scan", + "@id" : 1, + "files" : [ "#{TEST_FILE}" ], + "storage" : { + "type" : "file", + "connection" : "file:///", + "workspaces" : { + "root" : { + "location" : "/", + "writable" : false, + "storageformat" : null + }, + "tmp" : { + "location" : "/tmp", + "writable" : true, + "storageformat" : "csv" + } + }, + "formats" : { + "psv" : { + "type" : "text", + "extensions" : [ "tbl" ], + "delimiter" : "|" + }, + "csv" : { + "type" : "text", + "extensions" : [ "csv" ], + "delimiter" : "," + }, + "tsv" : { + "type" : "text", + "extensions" : [ "tsv" ], + "delimiter" : "\t" + }, + "parquet" : { + "type" : "parquet" + }, + "json" : { + "type" : "json" + } + } + }, + "format" : { + "type" : "json" + }, + "columns" : [ "`id`", "`summary`" ], + "selectionRoot" : "#{TEST_FILE}" + }, { + "pop" : "project", + "@id" : 2, + "exprs" : [ { + "ref" : "`id`", + "expr" : "`id`" + }, { + "ref" : "`summary`", + "expr" : "`summary`" + } ], + "child" : 1, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "hash-to-random-exchange", + "@id" : 3, + "child" : 2, + "expr" : "hash(`id`) ", + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "external-sort", + "@id" : 4, + "child" : 3, + "orderings" : [ { + "order" : "ASC", + "expr" : "`id`", + "nullDirection" : "UNSPECIFIED" + } ], + "reverse" : false, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "selection-vector-remover", + "@id" : 5, + "child" : 4, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "single-merge-exchange", + "@id" : 6, + "child" : 5, + "orderings" : [ { + "order" : "ASC", + "expr" : "`id`", + "nullDirection" : "UNSPECIFIED" + } ], + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "screen", + "@id" : 7, + "child" : 6, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + } ] +} \ No newline at end of file
