This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 8a7007f03397849b555a74297a6a637293958cc5 Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Thu May 30 18:43:09 2019 -0700 DRILL-7258: Remove field width limit for text reader The V2 text reader enforced a limit of 64K characters when using column headers, but not when using the columns[] array. The V3 reader enforced the 64K limit in both cases. This patch removes the limit in both cases. The limit now is the 16MB vector size limit. With headers, no one column can exceed 16MB. With the columns[] array, no one row can exceed 16MB. (The 16MB limit is set by the Netty memory allocator.) Added an "appendBytes()" method to the scalar column writer which adds additional bytes to those already written for a specific column or array element value. The method is implemented for VarChar, Var16Char and VarBinary vectors. It throws an exception for all other types. When used with a type conversion shim, the appendBytes() method throws an exception. This should be OK because, the previous setBytes() should have failed because a huge value is not acceptable for numeric or date types conversions. Added unit tests of the append feature, and for the append feature in the batch overflow case (when appending bytes causes the vector or batch to overflow.) Also added tests to verify the lack of column width limit with the text reader, both with and without headers. closes #1802 --- .../exec/physical/rowSet/impl/WriterIndexImpl.java | 3 + .../easy/text/compliant/v3/BaseFieldOutput.java | 52 ++++++++++--- .../easy/text/compliant/v3/FieldVarCharOutput.java | 12 +-- .../text/compliant/v3/RepeatedVarCharOutput.java | 7 +- .../store/easy/text/compliant/v3/TextInput.java | 9 +-- .../rowSet/impl/TestResultSetLoaderOverflow.java | 76 +++++++++++++++++- .../store/easy/text/compliant/BaseCsvTest.java | 20 +++++ .../easy/text/compliant/TestCsvWithHeaders.java | 25 ++++++ .../easy/text/compliant/TestCsvWithoutHeaders.java | 29 +++++++ .../apache/drill/test/rowSet/RowSetWriterImpl.java | 3 + .../drill/test/rowSet/test/PerformanceTool.java | 5 +- .../test/rowSet/test/TestFixedWidthWriter.java | 3 + .../test/rowSet/test/TestScalarAccessors.java | 89 ++++++++++++++++++++++ .../main/codegen/templates/ColumnAccessors.java | 11 +++ .../exec/vector/accessor/ColumnWriterIndex.java | 13 +++- .../drill/exec/vector/accessor/ScalarWriter.java | 1 + .../accessor/convert/AbstractWriteConverter.java | 5 ++ .../accessor/writer/AbstractArrayWriter.java | 7 +- .../vector/accessor/writer/BaseScalarWriter.java | 5 ++ .../vector/accessor/writer/BaseVarWidthWriter.java | 7 ++ .../exec/vector/accessor/writer/MapWriter.java | 1 + .../accessor/writer/NullableScalarWriter.java | 8 ++ .../accessor/writer/OffsetVectorWriterImpl.java | 6 ++ .../vector/accessor/writer/ScalarArrayWriter.java | 3 + .../accessor/writer/dummy/DummyScalarWriter.java | 3 + 25 files changed, 376 insertions(+), 27 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java index 9fb3e4e..6119791 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java @@ -106,6 +106,9 @@ class WriterIndexImpl implements ColumnWriterIndex { public void nextElement() { } @Override + public void prevElement() { } + + @Override public ColumnWriterIndex outerIndex() { return null; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java index 6bf0bb6..5dd4284 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java @@ -17,13 +17,17 @@ */ package org.apache.drill.exec.store.easy.text.compliant.v3; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.vector.accessor.ScalarWriter; public abstract class BaseFieldOutput extends TextOutput { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); - private static final int MAX_FIELD_LENGTH = 1024 * 64; + /** + * Width of the per-field data buffer. Fields can be larger. + * In that case, subsequent buffers are appended to the vector + * to form the full field. + */ + private static final int BUFFER_LEN = 1024; // track which field is getting appended protected int currentFieldIndex = -1; @@ -31,6 +35,8 @@ public abstract class BaseFieldOutput extends TextOutput { protected int currentDataPointer; // track if field is still getting appended private boolean fieldOpen = true; + // number of bytes written to field thus far + protected int fieldWriteCount; // holds chars for a field protected byte[] fieldBytes; protected final RowSetLoader writer; @@ -84,7 +90,7 @@ public abstract class BaseFieldOutput extends TextOutput { // If we project at least one field, allocate a buffer. if (maxField >= 0) { - fieldBytes = new byte[MAX_FIELD_LENGTH]; + fieldBytes = new byte[BUFFER_LEN]; } } @@ -104,6 +110,7 @@ public abstract class BaseFieldOutput extends TextOutput { assert index == currentFieldIndex + 1; currentFieldIndex = index; currentDataPointer = 0; + fieldWriteCount = 0; fieldOpen = true; // Figure out if this field is projected. @@ -122,18 +129,41 @@ public abstract class BaseFieldOutput extends TextOutput { if (! fieldProjected) { return; } - if (currentDataPointer >= MAX_FIELD_LENGTH - 1) { - throw UserException - .unsupportedError() - .message("Text column is too large.") - .addContext("Column", currentFieldIndex) - .addContext("Limit", MAX_FIELD_LENGTH) - .build(logger); + if (currentDataPointer >= BUFFER_LEN - 1) { + writeToVector(); } fieldBytes[currentDataPointer++] = data; } + + /** + * Write a buffer of data to the underlying vector using the + * column writer. The buffer holds a complete or partial chunk + * of data for the field. If this is the first data for the field, + * write the bytes. If this is a second buffer for the same field, + * append the bytes. The append will work if the underlying vector + * is VarChar, it will fail if a type conversion shim is in between. + * (This is generally OK because the previous setBytes should have + * failed because a large int or date is not supported.) + */ + + protected void writeToVector() { + if (!fieldProjected) { + return; + } + ScalarWriter colWriter = columnWriter(); + if (fieldWriteCount == 0) { + colWriter.setBytes(fieldBytes, currentDataPointer); + } else { + colWriter.appendBytes(fieldBytes, currentDataPointer); + } + fieldWriteCount += currentDataPointer; + currentDataPointer = 0; + } + + protected abstract ScalarWriter columnWriter(); + @Override public boolean endField() { fieldOpen = false; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java index df48a55..482c5cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.easy.text.compliant.v3; import org.apache.drill.exec.physical.rowSet.RowSetLoader; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ScalarWriter; /** * Class is responsible for generating record batches for text file inputs. We generate @@ -52,11 +53,12 @@ class FieldVarCharOutput extends BaseFieldOutput { @Override public boolean endField() { - if (fieldProjected) { - writer.scalar(currentFieldIndex) - .setBytes(fieldBytes, currentDataPointer); - } - + writeToVector(); return super.endField(); } + + @Override + protected ScalarWriter columnWriter() { + return writer.scalar(currentFieldIndex); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java index 13b4450..f7f1035 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java @@ -120,7 +120,7 @@ public class RepeatedVarCharOutput extends BaseFieldOutput { // Save the field. - columnWriter.setBytes(fieldBytes, currentDataPointer); + writeToVector(); } else { // The field is not projected. @@ -134,4 +134,9 @@ public class RepeatedVarCharOutput extends BaseFieldOutput { return super.endField(); } + + @Override + protected ScalarWriter columnWriter() { + return columnWriter; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java index 26fade6..951bc81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java @@ -17,22 +17,21 @@ */ package org.apache.drill.exec.store.easy.text.compliant.v3; -import io.netty.buffer.DrillBuf; -import io.netty.util.internal.PlatformDependent; +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; - -import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; +import io.netty.buffer.DrillBuf; +import io.netty.util.internal.PlatformDependent; /** * Class that fronts an InputStream to provide a byte consumption interface. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java index a82e3c3..3f0989d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java @@ -37,12 +37,12 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ArrayReader; import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetReader; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.drill.shaded.guava.com.google.common.base.Charsets; /** * Exercise the vector overflow functionality for the result set loader. @@ -706,4 +706,78 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest { rsLoader.close(); } + + @Test + public void testVectorSizeLimitWithAppend() { + TupleMetadata schema = new SchemaBuilder() + .add("s", MinorType.VARCHAR) + .buildSchema(); + ResultSetOptions options = new OptionBuilder() + .setRowCountLimit(ValueVector.MAX_ROW_COUNT) + .setSchema(schema) + .build(); + ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options); + RowSetLoader rootWriter = rsLoader.writer(); + + rsLoader.startBatch(); + byte head[] = "abc".getBytes(); + byte tail[] = new byte[523]; + Arrays.fill(tail, (byte) 'X'); + int count = 0; + ScalarWriter colWriter = rootWriter.scalar(0); + while (! rootWriter.isFull()) { + rootWriter.start(); + colWriter.setBytes(head, head.length); + colWriter.appendBytes(tail, tail.length); + colWriter.appendBytes(tail, tail.length); + rootWriter.save(); + count++; + } + + // Number of rows should be driven by vector size. + // Our row count should include the overflow row + + int valueLength = head.length + 2 * tail.length; + int expectedCount = ValueVector.MAX_BUFFER_SIZE / valueLength; + assertEquals(expectedCount + 1, count); + + // Loader's row count should include only "visible" rows + + assertEquals(expectedCount, rootWriter.rowCount()); + + // Total count should include invisible and look-ahead rows. + + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + + // Result should exclude the overflow row + + RowSet result = fixture.wrap(rsLoader.harvest()); + assertEquals(expectedCount, result.rowCount()); + + // Verify that the values were, in fact, appended. + + String expected = new String(head, Charsets.UTF_8); + expected += new String(tail, Charsets.UTF_8); + expected += new String(tail, Charsets.UTF_8); + RowSetReader reader = result.reader(); + while (reader.next()) { + assertEquals(expected, reader.scalar(0).getString()); + } + result.clear(); + + // Next batch should start with the overflow row + + rsLoader.startBatch(); + assertEquals(1, rootWriter.rowCount()); + assertEquals(expectedCount + 1, rsLoader.totalRowCount()); + result = fixture.wrap(rsLoader.harvest()); + assertEquals(1, result.rowCount()); + reader = result.reader(); + while (reader.next()) { + assertEquals(expected, reader.scalar(0).getString()); + } + result.clear(); + + rsLoader.close(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java index c2aeac6..2819aa8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java @@ -29,6 +29,8 @@ import org.apache.drill.test.ClusterTest; public class BaseCsvTest extends ClusterTest { + protected final int BIG_COL_SIZE = 70_000; + protected static final String PART_DIR = "root"; protected static final String NESTED_DIR = "nested"; protected static final String ROOT_FILE = "first.csv"; @@ -118,4 +120,22 @@ public class BaseCsvTest extends ClusterTest { } } } + protected String buildBigColFile(boolean withHeader) throws IOException { + String fileName = "hugeCol.csv"; + try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) { + if (withHeader) { + out.println("id,big,n"); + } + for (int i = 0; i < 10; i++) { + out.print(i + 1); + out.print(","); + for (int j = 0; j < BIG_COL_SIZE; j++) { + out.print((char) ((j + i) % 26 + 'A')); + } + out.print(","); + out.println((i + 1) * 10); + } + } + return fileName; + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java index 784c4be..645af30 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java @@ -981,4 +981,29 @@ public class TestCsvWithHeaders extends BaseCsvTest { resetV3(); } } + + @Test + public void testHugeColumn() throws IOException { + String fileName = buildBigColFile(true); + try { + enableV3(true); + String sql = "SELECT * FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet(); + assertEquals(10, actual.rowCount()); + RowSetReader reader = actual.reader(); + while (reader.next()) { + int i = reader.logicalIndex(); + assertEquals(Integer.toString(i + 1), reader.scalar(0).getString()); + String big = reader.scalar(1).getString(); + assertEquals(BIG_COL_SIZE, big.length()); + for (int j = 0; j < BIG_COL_SIZE; j++) { + assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j)); + } + assertEquals(Integer.toString((i + 1) * 10), reader.scalar(2).getString()); + } + actual.clear(); + } finally { + resetV3(); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java index ec6810d..2d68a01 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java @@ -442,4 +442,33 @@ public class TestCsvWithoutHeaders extends BaseCsvTest { resetV3(); } } + + @Test + public void testHugeColumn() throws IOException { + String fileName = buildBigColFile(false); + try { + enableV3(true); + String sql = "SELECT * FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet(); + assertEquals(10, actual.rowCount()); + RowSetReader reader = actual.reader(); + ArrayReader arrayReader = reader.array(0); + while (reader.next()) { + int i = reader.logicalIndex(); + arrayReader.next(); + assertEquals(Integer.toString(i + 1), arrayReader.scalar().getString()); + arrayReader.next(); + String big = arrayReader.scalar().getString(); + assertEquals(BIG_COL_SIZE, big.length()); + for (int j = 0; j < BIG_COL_SIZE; j++) { + assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j)); + } + arrayReader.next(); + assertEquals(Integer.toString((i + 1) * 10), arrayReader.scalar().getString()); + } + actual.clear(); + } finally { + resetV3(); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java index 8ba1f93..6512d62 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java @@ -81,6 +81,9 @@ public class RowSetWriterImpl extends AbstractTupleWriter implements RowSetWrite public final void nextElement() { } @Override + public final void prevElement() { } + + @Override public void rollover() { throw new UnsupportedOperationException("Rollover not supported in the row set writer."); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java index fa92c09..c810f93 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java @@ -34,8 +34,8 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter; import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter; import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter; -import org.apache.drill.test.OperatorFixture; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.test.OperatorFixture; /** * Tests the performance of the writers compared to using the value @@ -180,6 +180,9 @@ public class PerformanceTool { public final void nextElement() { index++; } @Override + public final void prevElement() { } + + @Override public void rollover() { } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java index 3eba578..f7304e9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java @@ -56,6 +56,9 @@ public class TestFixedWidthWriter extends SubOperatorTest { public void nextElement() { } @Override + public void prevElement() { } + + @Override public void rollover() { } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java index 582c2f4..cb11af0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java @@ -18,6 +18,7 @@ package org.apache.drill.test.rowSet.test; import static org.apache.drill.test.rowSet.RowSetUtilities.dec; +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -39,14 +40,19 @@ import org.apache.drill.exec.vector.DateUtilities; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ArrayReader; +import org.apache.drill.exec.vector.accessor.ArrayWriter; import org.apache.drill.exec.vector.accessor.ScalarReader; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.ValueType; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetUtilities; +import org.apache.drill.test.rowSet.RowSetWriter; import org.joda.time.DateTimeZone; import org.joda.time.Instant; import org.joda.time.LocalDate; @@ -1774,4 +1780,87 @@ public class TestScalarAccessors extends SubOperatorTest { } rsb.build().clear(); } + + /** + * Test the ability to append bytes to a VarChar column. Should work for + * Var16Char, but that type is not yet supported in Drill. + */ + + @Test + public void testAppend() { + doTestAppend(new SchemaBuilder() + .add("col", MinorType.VARCHAR) + .buildSchema()); + doTestAppend(new SchemaBuilder() + .addNullable("col", MinorType.VARCHAR) + .buildSchema()); + } + + private void doTestAppend(TupleMetadata schema) { + DirectRowSet rs = DirectRowSet.fromSchema(fixture.allocator(), schema); + RowSetWriter writer = rs.writer(100); + ScalarWriter colWriter = writer.scalar("col"); + + byte first[] = "abc".getBytes(); + byte second[] = "12345".getBytes(); + colWriter.setBytes(first, first.length); + colWriter.appendBytes(second, second.length); + writer.save(); + colWriter.setBytes(second, second.length); + colWriter.appendBytes(first, first.length); + writer.save(); + colWriter.setBytes(first, first.length); + colWriter.appendBytes(second, second.length); + writer.save(); + RowSet actual = writer.done(); + + RowSet expected = new RowSetBuilder(fixture.allocator(), schema) + .addSingleCol("abc12345") + .addSingleCol("12345abc") + .addSingleCol("abc12345") + .build(); + + RowSetUtilities.verify(expected, actual); + } + + /** + * Test the ability to append bytes to a VarChar column. Should work for + * Var16Char, but that type is not yet supported in Drill. + */ + + @Test + public void testAppendWithArray() { + TupleMetadata schema = new SchemaBuilder() + .addArray("col", MinorType.VARCHAR) + .buildSchema(); + + DirectRowSet rs = DirectRowSet.fromSchema(fixture.allocator(), schema); + RowSetWriter writer = rs.writer(100); + ArrayWriter arrayWriter = writer.array("col"); + ScalarWriter colWriter = arrayWriter.scalar(); + + byte first[] = "abc".getBytes(); + byte second[] = "12345".getBytes(); + for (int i = 0; i < 3; i++) { + colWriter.setBytes(first, first.length); + colWriter.appendBytes(second, second.length); + arrayWriter.save(); + colWriter.setBytes(second, second.length); + colWriter.appendBytes(first, first.length); + arrayWriter.save(); + colWriter.setBytes(first, first.length); + colWriter.appendBytes(second, second.length); + arrayWriter.save(); + writer.save(); + } + RowSet actual = writer.done(); + + RowSet expected = new RowSetBuilder(fixture.allocator(), schema) + .addSingleCol(strArray("abc12345", "12345abc", "abc12345")) + .addSingleCol(strArray("abc12345", "12345abc", "abc12345")) + .addSingleCol(strArray("abc12345", "12345abc", "abc12345")) + .build(); + + RowSetUtilities.verify(expected, actual); + } } diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java index 0891e13..79f352a 100644 --- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java +++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java @@ -415,6 +415,17 @@ public class ColumnAccessors { buf.writerIndex(VALUE_WIDTH); } </#if> + + <#if drillType == "VarChar" || drillType == "Var16Char" || drillType == "VarBinary"> + @Override + public final void appendBytes(final byte[] value, final int len) { + vectorIndex.prevElement(); + final int offset = prepareAppend(len); + drillBuf.setBytes(offset, value, 0, len); + offsetsWriter.reviseOffset(offset + len); + vectorIndex.nextElement(); + } + </#if> <#if drillType == "VarChar"> @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java index 7e225c9..cdeb0df 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java @@ -48,14 +48,23 @@ public interface ColumnWriterIndex { int vectorIndex(); /** - * Index for array elements that allows the caller to increment the - * index. For arrays, writing (or saving) one value automatically + * Increment the index for an array. + * For arrays, writing (or saving) one value automatically * moves to the next value. Ignored for non-element indexes. */ void nextElement(); /** + * Decrement the index for an array. Used exclusively for + * appending bytes to a VarChar, Var16Char or VarBytes + * column. Assumed to be followed by another call + * to nextElement(). + */ + + void prevElement(); + + /** * When handling overflow, the index must be reset so that the current row * starts at the start of the vector. Relative offsets must be preserved. * (That is, if the current write position for an array is four greater than diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java index 55a645e..44a4847 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java @@ -70,6 +70,7 @@ public interface ScalarWriter extends ColumnWriter { void setDouble(double value); void setString(String value); void setBytes(byte[] value, int len); + void appendBytes(byte[] value, int len); void setDecimal(BigDecimal value); void setPeriod(Period value); void setDate(LocalDate value); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java index b98e8e0..f92eed6 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java @@ -110,6 +110,11 @@ public abstract class AbstractWriteConverter extends AbstractScalarWriter { } @Override + public void appendBytes(byte[] value, int len) { + throw conversionError("bytes"); + } + + @Override public void setDecimal(BigDecimal value) { baseWriter.setDecimal(value); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java index 1b42169..b8ec266 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java @@ -144,7 +144,12 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { @Override public void nextElement() { } - public void next() { elementIndex++; } + @Override + public void prevElement() { } + + protected void next() { elementIndex++; } + + protected void prev() { elementIndex--; } public int valueStartOffset() { return offsetsWriter.nextOffset(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java index 0083ece..8dc85cf 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java @@ -264,6 +264,11 @@ public abstract class BaseScalarWriter extends AbstractScalarWriterImpl { } @Override + public void appendBytes(byte[] value, int len) { + throw conversionError("bytes"); + } + + @Override public void setDecimal(BigDecimal value) { throw conversionError("Decimal"); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java index 70de95a..0bac916 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java @@ -85,6 +85,13 @@ public abstract class BaseVarWidthWriter extends BaseScalarWriter { return offsetsWriter.nextOffset; } + protected final int prepareAppend(final int width) { + // No fill empties needed: must have been done + // on previous setBytes() call. + + return writeOffset(width); + } + @Override protected final void setBuffer() { drillBuf = vector().getBuffer(); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java index 82e90e9..fd6e7c4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java @@ -51,6 +51,7 @@ public abstract class MapWriter extends AbstractTupleWriter { @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); } @Override public int vectorIndex() { return baseIndex.vectorIndex(); } @Override public void nextElement() { } + @Override public void prevElement() { } @Override public void rollover() { } @Override public ColumnWriterIndex outerIndex() { diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index be3a3e4..856a44b 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -60,6 +60,9 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl { } @Override + public void prevElement() { } + + @Override public void rollover() { parentIndex.rollover(); } @@ -180,6 +183,11 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl { } @Override + public void appendBytes(byte[] value, int len) { + baseWriter.appendBytes(value, len); + } + + @Override public void setDecimal(BigDecimal value) { baseWriter.setDecimal(value); isSetWriter.setInt(1); diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java index f4ee0ab..1da362a 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java @@ -253,6 +253,12 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements nextOffset = newOffset; } + public final void reviseOffset(final int newOffset) { + final int writeIndex = vectorIndex.vectorIndex() + 1; + drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); + nextOffset = newOffset; + } + public final void fillOffset(final int newOffset) { drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, newOffset); nextOffset = newOffset; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java index 8bacdf4..4df8721 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java @@ -60,6 +60,9 @@ public class ScalarArrayWriter extends BaseArrayWriter { @Override public final void nextElement() { next(); } + + @Override + public final void prevElement() { prev(); } } private final ScalarWriter elementWriter; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java index 852bd0d..2d52c3e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java @@ -72,6 +72,9 @@ public class DummyScalarWriter extends AbstractScalarWriterImpl { public void setBytes(byte[] value, int len) { } @Override + public void appendBytes(byte[] value, int len) { } + + @Override public void setDecimal(BigDecimal value) { } @Override