This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 9993fa3547b029db5fe33a2210fa6f07e8ac1990 Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Sat Oct 5 18:57:14 2019 -0700 DRILL-7358: Fix COUNT(*) for empty text files Fixes a subtle error when a text file has a header (and so has a schema), but is in a COUNT(*) query, so that no columns are projected. Ensures that, in this case, an empty schema is treated as a valid result set. Tests: updated CSV tests to include this case. closes #1867 --- .../exec/physical/impl/protocol/SchemaTracker.java | 2 +- .../impl/scan/framework/SchemaNegotiatorImpl.java | 2 +- .../scan/project/projSet/EmptyProjectionSet.java | 3 + .../project/projSet/ExplicitProjectionSet.java | 7 +- .../project/projSet/WildcardProjectionSet.java | 7 ++ .../exec/physical/resultSet/ProjectionSet.java | 1 + .../exec/physical/resultSet/impl/ColumnState.java | 3 +- .../resultSet/impl/ResultSetLoaderImpl.java | 11 +++ .../store/easy/text/reader/BaseFieldOutput.java | 2 +- .../easy/text/compliant/TestCsvWithHeaders.java | 90 ++++++++++++++++++++-- .../java/org/apache/drill/test/QueryBuilder.java | 10 ++- 11 files changed, 120 insertions(+), 18 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java index 0494498..4149850 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java @@ -65,7 +65,7 @@ public class SchemaTracker { private List<ValueVector> currentVectors = new ArrayList<>(); public void trackSchema(VectorContainer newBatch) { - if (! isSameSchema(newBatch)) { + if (schemaVersion == 0 || ! isSameSchema(newBatch)) { schemaVersion++; captureSchema(newBatch); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java index 8c934b5..e1302218 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java @@ -92,7 +92,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { @Override public void setTableSchema(TupleMetadata schema, boolean isComplete) { tableSchema = schema; - this.isSchemaComplete = schema != null && isComplete; + isSchemaComplete = schema != null && isComplete; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java index 1345006..016cc63 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java @@ -37,4 +37,7 @@ public class EmptyProjectionSet implements ProjectionSet { @Override public void setErrorContext(CustomErrorContext errorContext) { } + + @Override + public boolean isEmpty() { return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java index 94114f5..cfa82dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java @@ -26,6 +26,8 @@ import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.Requested import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Projection set based on an explicit set of columns provided @@ -34,7 +36,7 @@ import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory; */ public class ExplicitProjectionSet extends AbstractProjectionSet { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitProjectionSet.class); + private static final Logger logger = LoggerFactory.getLogger(ExplicitProjectionSet.class); private final RequestedTuple requestedProj; @@ -106,4 +108,7 @@ public class ExplicitProjectionSet extends AbstractProjectionSet { .addContext(errorContext) .build(logger); } + + @Override + public boolean isEmpty() { return requestedProj.projections().isEmpty(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java index dc4858b..640a371 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java @@ -52,4 +52,11 @@ public class WildcardProjectionSet extends AbstractProjectionSet { return new ProjectedReadColumn(col, null, outputSchema, conv); } } + + // Wildcard means use whatever schema is provided by the reader, + // so the projection itself is non-empty even if the reader has no + // columns. + + @Override + public boolean isEmpty() { return false; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java index 6956713..298c0e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java @@ -102,4 +102,5 @@ public interface ProjectionSet { void setErrorContext(CustomErrorContext errorContext); ColumnReadProjection readProjection(ColumnMetadata col); + boolean isEmpty(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java index d68b246..82c37ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnState.java @@ -193,8 +193,7 @@ public abstract class ColumnState { this.vectorState = vectorState; addVersion = writer.isProjected() ? loader.bumpVersion() : loader.activeSchemaVersion(); - state = loader.hasOverflow() ? - State.NEW_LOOK_AHEAD : State.NORMAL; + state = loader.hasOverflow() ? State.NEW_LOOK_AHEAD : State.NORMAL; this.writer = writer; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java index c639ad5..2cfdeec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java @@ -312,6 +312,17 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals { logger.debug("Schema: " + options.schema.toString()); new BuildFromSchema().buildTuple(rootWriter, options.schema); } + + // If we want to project nothing, then we do, in fact, have a + // valid schema, it just happens to be an empty schema. Bump the + // schema version so we know we have that empty schema. + // + // This accomplishes a result similar to the "legacy" readers + // achieve by adding a dummy column. + + if (projectionSet.isEmpty()) { + bumpVersion(); + } } private void updateCardinality() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java index 9f88e5d..6e74d36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java @@ -154,7 +154,7 @@ public abstract class BaseFieldOutput extends TextOutput { } ScalarWriter colWriter = columnWriter(); if (fieldWriteCount == 0) { - colWriter.setBytes(fieldBytes, currentDataPointer); + colWriter.setBytes(fieldBytes, currentDataPointer); } else { colWriter.appendBytes(fieldBytes, currentDataPointer); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java index 00a7a75..171669a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java @@ -17,12 +17,6 @@ */ package org.apache.drill.exec.store.easy.text.compliant; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -32,17 +26,23 @@ import java.util.Iterator; import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.physical.rowSet.DirectRowSet; import org.apache.drill.exec.physical.rowSet.RowSet; import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.exec.physical.rowSet.RowSetReader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.rowSet.RowSetUtilities; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Sanity test of CSV files with headers. * <p> @@ -67,6 +67,8 @@ public class TestCsvWithHeaders extends BaseCsvTest { private static final String TEST_FILE_NAME = "basic.csv"; private static final String COLUMNS_FILE_NAME = "columns.csv"; private static final String EMPTY_HEADERS_FILE = "noHeaders.csv"; + private static final String EMPTY_BODY_FILE = "noData.csv"; + private static final String COUNT_STAR = "SELECT COUNT(*) FROM `dfs.data`.`%s`"; private static String[] invalidHeaders = { "$,,9b,c,c,c_2", @@ -78,6 +80,10 @@ public class TestCsvWithHeaders extends BaseCsvTest { "10,foo,bar" }; + private static String[] emptyBody = { + "a,b,c", + }; + private static String[] raggedRows = { "a,b,c", "10,dino", @@ -97,6 +103,7 @@ public class TestCsvWithHeaders extends BaseCsvTest { buildFile(TEST_FILE_NAME, validHeaders); buildNestedTable(); buildFile(COLUMNS_FILE_NAME, columnsCol); + buildFile(EMPTY_BODY_FILE, emptyBody); } /** @@ -119,6 +126,11 @@ public class TestCsvWithHeaders extends BaseCsvTest { buildFile(EMPTY_FILE, new String[] {}); RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_FILE)).rowSet(); assertNull(rowSet); + + // Try again with COUNT(*) + + long count = client.queryBuilder().sql(COUNT_STAR, EMPTY_FILE).singletonLong(); + assertEquals(0, count); } /** @@ -137,6 +149,49 @@ public class TestCsvWithHeaders extends BaseCsvTest { } @Test + public void testHeadersNoNewline() throws IOException { + String fileName = "headerNoNewline.csv"; + try (PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) { + out.print("a,b,c"); // note: no \n in the end + } + RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_BODY_FILE)).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + + /** + * A file with a header has a schema, but has no rows. This is different than + * the empty file case because we do, in fact, know the schema. + */ + @Test + public void testEmptyBody() throws IOException { + buildFile(EMPTY_BODY_FILE, emptyBody); + + // SELECT * query: expect schema-only result. + + RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_BODY_FILE)).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + RowSetUtilities.verify(expected, rowSet); + + // Try again with COUNT(*) + + long count = client.queryBuilder().sql(COUNT_STAR, EMPTY_BODY_FILE).singletonLong(); + assertEquals(0, count); + } + + @Test public void testValidCsvHeaders() throws IOException { RowSet actual = client.queryBuilder().sql(makeStatement(TEST_FILE_NAME)).rowSet(); @@ -210,6 +265,25 @@ public class TestCsvWithHeaders extends BaseCsvTest { RowSetUtilities.verify(expected, actual); } + @Test + public void testDataNoNewline() throws IOException { + String fileName = "dataNoNewline.csv"; + try (PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) { + out.println("a,b,c"); + out.print("fred,barney,wilma"); // note: no \n in the end + } + RowSet rowSet = client.queryBuilder().sql(makeStatement(fileName)).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("fred", "barney", "wilma") + .build(); + RowSetUtilities.verify(expected, rowSet); + } + /** * Verify that implicit columns are recognized and populated. Sanity test * of just one implicit column. V3 uses non-nullable VARCHAR for file diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index eb29426..94f5d85 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -36,6 +36,9 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.client.LoggingResultsListener; import org.apache.drill.exec.client.QuerySubmitter.Format; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.rowSet.DirectRowSet; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; @@ -56,9 +59,6 @@ import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.test.BufferingQueryEventListener.QueryEvent; import org.apache.drill.test.ClientFixture.StatementParser; -import org.apache.drill.exec.physical.rowSet.DirectRowSet; -import org.apache.drill.exec.physical.rowSet.RowSet; -import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.joda.time.Period; /** @@ -478,7 +478,9 @@ public class QueryBuilder { } try { RowSetReader reader = rowSet.reader(); - reader.next(); + if (!reader.next()) { + throw new IllegalStateException("No rows returned"); + } return scalarReader.read(reader.scalar(0)); } finally { rowSet.clear();