This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit a13608b8d1f8110d7bd944c10259b2252dfbf18b Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Thu Oct 10 15:43:33 2019 +0300 DRILL-6096: Provide mechanism to configure text writer configuration 1. Usage of format plugin configuration allows to specify line and field delimiters, quotes and escape characters. 2. Usage of system / session options allows to specify if writer should add headers, force quotes. closes #1873 --- exec/java-exec/pom.xml | 2 +- .../templates/StringOutputRecordWriter.java | 12 +- .../java/org/apache/drill/exec/ExecConstants.java | 7 + .../impl/scan/columns/ColumnsArrayParser.java | 13 +- .../exec/server/options/SystemOptionManager.java | 2 + .../exec/store/easy/text/TextFormatPlugin.java | 53 +++-- .../store/easy/text/reader/BaseFieldOutput.java | 2 +- .../store/easy/text/reader/FieldVarCharOutput.java | 13 +- .../exec/store/easy/text/reader/HeaderBuilder.java | 26 +- .../easy/text/reader/RepeatedVarCharOutput.java | 21 +- .../exec/store/easy/text/reader/TextInput.java | 13 +- .../exec/store/easy/text/reader/TextOutput.java | 54 ++--- .../store/easy/text/reader/TextParsingContext.java | 119 ++++++---- .../easy/text/reader/TextParsingSettings.java | 2 +- .../exec/store/easy/text/reader/TextReader.java | 31 +-- .../store/easy/text/writer/TextRecordWriter.java | 121 ++++++---- .../java-exec/src/main/resources/drill-module.conf | 2 + .../exec/physical/impl/writer/TestTextWriter.java | 264 +++++++++++++++++++++ .../text/compliant/TestCsvTableProperties.java | 76 ++++-- .../java/org/apache/drill/test/ClusterFixture.java | 94 +++++--- 20 files changed, 667 insertions(+), 260 deletions(-) diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index a9ac685..9f1ace3 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -89,7 +89,7 @@ <dependency> <groupId>com.univocity</groupId> <artifactId>univocity-parsers</artifactId> - <version>1.3.0</version> + <version>2.8.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java index 1bf1b09..017fda4 100644 --- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java @@ -43,7 +43,7 @@ import java.util.Map; /** * Abstract implementation of RecordWriter interface which exposes interface: - * {@link #writeHeader(List)} + * {@link #startNewSchema(BatchSchema)} * {@link #addField(int,String)} * to output the data in string format instead of implementing addField for each type holder. * @@ -60,13 +60,7 @@ public abstract class StringOutputRecordWriter extends AbstractRecordWriter { @Override public void updateSchema(VectorAccessible batch) throws IOException { - BatchSchema schema = batch.getSchema(); - List<String> columnNames = Lists.newArrayList(); - for (int i=0; i < schema.getFieldCount(); i++) { - columnNames.add(schema.getColumn(i).getName()); - } - - startNewSchema(columnNames); + startNewSchema(batch.getSchema()); } @Override @@ -160,6 +154,6 @@ public abstract class StringOutputRecordWriter extends AbstractRecordWriter { public void cleanup() throws IOException { } - public abstract void startNewSchema(List<String> columnNames) throws IOException; + public abstract void startNewSchema(BatchSchema schema) throws IOException; public abstract void addField(int fieldId, String value) throws IOException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 066d04d..20668b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -431,6 +431,13 @@ public final class ExecConstants { public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator("store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE, new OptionDescription("Estimate of the row size in a delimited text file, such as csv. The closer to actual, the better the query plan. Used for all csv files in the system/session where the value is set. Impacts the decision to plan a broadcast join or not.")); + public static final String TEXT_WRITER_ADD_HEADER = "store.text.writer.add_header"; + public static final BooleanValidator TEXT_WRITER_ADD_HEADER_VALIDATOR = new BooleanValidator(TEXT_WRITER_ADD_HEADER, + new OptionDescription("Enables the TEXT writer to write header in newly created file. Default is true. (Drill 1.17+)")); + + public static final String TEXT_WRITER_FORCE_QUOTES = "store.text.writer.force_quotes"; + public static final BooleanValidator TEXT_WRITER_FORCE_QUOTES_VALIDATOR = new BooleanValidator(TEXT_WRITER_FORCE_QUOTES, + new OptionDescription("Enables the TEXT writer to enclose in quotes all fields. Default is false. (Drill 1.17+)")); /** * Json writer option for writing `NaN` and `Infinity` tokens as numbers (not enclosed with double quotes) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java index ceb0848..ffd69e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java @@ -24,8 +24,10 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl; import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn; -import org.apache.drill.exec.store.easy.text.reader.TextReader; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Parses the `columns` array. Doing so is surprisingly complex. @@ -68,7 +70,8 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTes */ public class ColumnsArrayParser implements ScanProjectionParser { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnsArrayParser.class); + + private static final Logger logger = LoggerFactory.getLogger(ColumnsArrayParser.class); // Config @@ -151,13 +154,13 @@ public class ColumnsArrayParser implements ScanProjectionParser { if (inCol.isArray()) { int maxIndex = inCol.maxIndex(); - if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { + if (maxIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) { throw UserException .validationError() .message("`columns`[%d] index out of bounds, max supported size is %d", - maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS) + maxIndex, TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) .addContext("Column:", inCol.name()) - .addContext("Maximum index:", TextReader.MAXIMUM_NUMBER_COLUMNS) + .addContext("Maximum index:", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) .addContext("Actual index:", maxIndex) .addContext(builder.context()) .build(logger); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 9438870..897a7c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -186,6 +186,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR), new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE), new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE), + new OptionDefinition(ExecConstants.TEXT_WRITER_ADD_HEADER_VALIDATOR), + new OptionDefinition(ExecConstants.TEXT_WRITER_FORCE_QUOTES_VALIDATOR), new OptionDefinition(ExecConstants.JSON_EXTENDED_TYPES), new OptionDefinition(ExecConstants.JSON_WRITER_UGLIFY), new OptionDefinition(ExecConstants.JSON_WRITER_SKIPNULLFIELDS), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 289d26c..c090f98 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -17,11 +17,11 @@ */ package org.apache.drill.exec.store.easy.text; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.exceptions.ChildErrorContext; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; @@ -31,9 +31,9 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder; @@ -58,14 +58,13 @@ import org.apache.drill.exec.store.easy.text.reader.TextParsingSettings; import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Text format plugin for CSV and other delimited text formats. @@ -83,6 +82,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> { private final static String PLUGIN_NAME = "text"; + public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + + public static final int MAX_CHARS_PER_COLUMN = Character.MAX_VALUE; + + public static final char NULL_CHAR = '\0'; + // Provided schema table properties unique to this plugin. If specified // in the provided schema, they override the corresponding property in // the plugin config. Names here match the field names in the format config. @@ -103,7 +108,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @JsonInclude(Include.NON_DEFAULT) public static class TextFormatConfig implements FormatPluginConfig { - public List<String> extensions = ImmutableList.of(); + public List<String> extensions = Collections.emptyList(); public String lineDelimiter = "\n"; public char fieldDelimiter = '\n'; public char quote = '"'; @@ -125,11 +130,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @JsonIgnore public boolean isHeaderExtractionEnabled() { return extractHeader; } - @JsonIgnore - public String getFieldDelimiterAsString(){ - return new String(new char[]{fieldDelimiter}); - } - @Deprecated @JsonProperty("delimiter") public void setFieldDelimiter(char delimiter){ @@ -312,14 +312,25 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @Override public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { - final Map<String, String> options = new HashMap<>(); + Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); + + TextFormatConfig config = getConfig(); + List<String> extensions = config.getExtensions(); + options.put("extension", extensions == null || extensions.isEmpty() ? null : extensions.get(0)); + FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); options.put("prefix", fragmentId); - options.put("separator", getConfig().getFieldDelimiterAsString()); - options.put("extension", getConfig().getExtensions().get(0)); + + options.put("addHeader", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_ADD_HEADER))); + options.put("forceQuotes", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_FORCE_QUOTES))); + + options.put("lineSeparator", config.getLineDelimiter()); + options.put("fieldDelimiter", String.valueOf(config.getFieldDelimiter())); + options.put("quote", String.valueOf(config.getQuote())); + options.put("escape", String.valueOf(config.getEscape())); RecordWriter recordWriter = new TextRecordWriter( context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java index 6e74d36..d4edaf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.easy.text.reader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.vector.accessor.ScalarWriter; -public abstract class BaseFieldOutput extends TextOutput { +public abstract class BaseFieldOutput implements TextOutput { /** * Width of the per-field data buffer. Fields can be larger. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java index 932096a..7fe6bd5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.easy.text.reader; import org.apache.drill.exec.physical.resultSet.RowSetLoader; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; import org.apache.drill.exec.vector.accessor.ScalarWriter; /** @@ -31,20 +32,18 @@ class FieldVarCharOutput extends BaseFieldOutput { /** * We initialize and add the varchar vector for each incoming field in this * constructor. - * @param outputMutator Used to create/modify schema - * @param fieldNames Incoming field names - * @param columns List of columns selected in the query - * @param isStarQuery boolean to indicate if all fields are selected or not + * + * @param writer row set writer */ - public FieldVarCharOutput(RowSetLoader writer) { + FieldVarCharOutput(RowSetLoader writer) { super(writer, - TextReader.MAXIMUM_NUMBER_COLUMNS, + TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS, makeMask(writer)); } private static boolean[] makeMask(RowSetLoader writer) { final TupleMetadata schema = writer.tupleSchema(); - final boolean projectionMask[] = new boolean[schema.size()]; + final boolean[] projectionMask = new boolean[schema.size()]; for (int i = 0; i < schema.size(); i++) { projectionMask[i] = writer.column(i).isProjected(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java index 2fb0ffc..c561171 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java @@ -17,6 +17,12 @@ */ package org.apache.drill.exec.store.easy.text.reader; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -24,11 +30,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.drill.common.exceptions.UserException; -import org.apache.hadoop.fs.Path; - -import org.apache.drill.shaded.guava.com.google.common.base.Charsets; - /** * Text output that implements a header reader/parser. * The caller parses out the characters of each header; @@ -45,8 +46,9 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets; // and read a single row, there is no good reason to try to use // value vectors and direct memory for this task. -public class HeaderBuilder extends TextOutput { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class); +public class HeaderBuilder implements TextOutput { + + private static final Logger logger = LoggerFactory.getLogger(HeaderBuilder.class); /** * Maximum Drill symbol length, as enforced for headers. @@ -71,9 +73,9 @@ public class HeaderBuilder extends TextOutput { public static final String ANONYMOUS_COLUMN_PREFIX = "column_"; - public final Path filePath; - public final List<String> headers = new ArrayList<>(); - public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); + private final Path filePath; + private final List<String> headers = new ArrayList<>(); + private final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); public HeaderBuilder(Path filePath) { this.filePath = filePath; @@ -214,7 +216,7 @@ public class HeaderBuilder extends TextOutput { // Force headers to be unique. - final Set<String> idents = new HashSet<String>(); + final Set<String> idents = new HashSet<>(); for (int i = 0; i < headers.size(); i++) { String header = headers.get(i); String key = header.toLowerCase(); @@ -254,7 +256,7 @@ public class HeaderBuilder extends TextOutput { // Just return the headers: any needed checks were done in // finishRecord() - final String array[] = new String[headers.size()]; + final String[] array = new String[headers.size()]; return headers.toArray(array); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java index 7d8894f..fdf3e53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java @@ -19,8 +19,11 @@ package org.apache.drill.exec.store.easy.text.reader; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; import org.apache.drill.exec.vector.accessor.ArrayWriter; import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class is responsible for generating record batches for text file inputs. We generate @@ -28,23 +31,23 @@ import org.apache.drill.exec.vector.accessor.ScalarWriter; * value within the vector containing all the fields in the record as individual array elements. */ public class RepeatedVarCharOutput extends BaseFieldOutput { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + + private static final Logger logger = LoggerFactory.getLogger(BaseFieldOutput.class); private final ScalarWriter columnWriter; - private final ArrayWriter arrayWriter; /** * Provide the row set loader (which must have just one repeated Varchar * column) and an optional array projection mask. - * @param projectionMask - * @param tupleLoader + * + * @param loader row set loader + * @param projectionMask array projection mask */ - public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) { super(loader, maxField(loader, projectionMask), projectionMask); - arrayWriter = writer.array(0); + ArrayWriter arrayWriter = writer.array(0); columnWriter = arrayWriter.scalar(); } @@ -61,7 +64,7 @@ public class RepeatedVarCharOutput extends BaseFieldOutput { // possible fields. if (projectionMask == null) { - return TextReader.MAXIMUM_NUMBER_COLUMNS; + return TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS; } // Else, this is a SELECT columns[x], columns[y], ... query. @@ -110,11 +113,11 @@ public class RepeatedVarCharOutput extends BaseFieldOutput { // this only if all fields are selected; the same query will succeed if // the user does a COUNT(*) or SELECT columns[x], columns[y], ... - if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { + if (currentFieldIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) { throw UserException .unsupportedError() .message("Text file contains too many fields") - .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS) + .addContext("Limit", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) .build(logger); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java index d9fa973..0ca2cfe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java @@ -74,12 +74,12 @@ final class TextInput { /** * The current position in the buffer. */ - public int bufferPtr; + private int bufferPtr; /** * The quantity of valid data in the buffer. */ - public int length = -1; + private int length = -1; private boolean endFound = false; @@ -91,7 +91,7 @@ final class TextInput { * {@link TextParsingSettings#getNormalizedNewLine()}) that is used to replace any * lineSeparator sequence found in the input. */ - public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { + TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { this.lineSeparator = settings.getNewLineDelimiter(); byte normalizedLineSeparator = settings.getNormalizedNewLine(); Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); @@ -156,9 +156,8 @@ final class TextInput { * May get an incomplete string since we don't support stream rewind. Returns empty string for now. * * @return String of last few bytes. - * @throws IOException for input file read errors */ - public String getStringSinceMarkForError() throws IOException { + public String getStringSinceMarkForError() { return " "; } @@ -359,10 +358,6 @@ final class TextInput { return charCount + bufferPtr; } - public long getLineCount() { - return lineCount; - } - public void close() throws IOException{ input.close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java index 71d4731..50e2d83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java @@ -17,63 +17,58 @@ */ package org.apache.drill.exec.store.easy.text.reader; - /** - * Base class for producing output record batches while dealing with + * Interface for producing output record batches while dealing with * text files. Defines the interface called from text parsers to create * the corresponding value vectors (record batch). */ +interface TextOutput { -abstract class TextOutput { - - public abstract void startRecord(); + /** + * Start processing a new record. + */ + void startRecord(); /** * Start processing a new field within a record. - * @param index index within the record + * + * @param index index within the record */ - public abstract void startField(int index); + void startField(int index); /** * End processing a field within a record. - * @return true if engine should continue processing record. false if rest of record can be skipped. + * + * @return true if engine should continue processing record. false if rest of record can be skipped. */ - public abstract boolean endField(); + boolean endField(); /** * Shortcut that lets the output know that we are closing ending a field with no data. + * * @return true if engine should continue processing record. false if rest of record can be skipped. */ - public abstract boolean endEmptyField(); - - /** - * Add the provided data but drop any whitespace. - * @param data character to append - */ - public void appendIgnoringWhitespace(byte data) { - if (TextReader.isWhite(data)) { - // noop - } else { - append(data); - } - } + boolean endEmptyField(); /** - * Appends a byte to the output character data buffer - * @param data current byte read + * Appends a byte to the output character data buffer. + * + * @param data current byte read */ - public abstract void append(byte data); + void append(byte data); /** * Completes the processing of a given record. Also completes the processing of the * last field being read. */ - public abstract void finishRecord(); + void finishRecord(); /** - * Return the total number of records (across batches) processed + * Return the total number of records (across batches) processed + * + * @return record count */ - public abstract long getRecordCount(); + long getRecordCount(); /** * Indicates if the current batch is full and reading for this batch @@ -83,6 +78,5 @@ abstract class TextOutput { * the batch to be sent downstream, false if the reader may continue to * add rows to the current batch */ - - public abstract boolean isFull(); + boolean isFull(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java index 3f6dbeb..5b3f3d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java @@ -17,101 +17,130 @@ */ package org.apache.drill.exec.store.easy.text.reader; -import java.io.IOException; - import com.univocity.parsers.common.ParsingContext; +import com.univocity.parsers.common.record.Record; +import com.univocity.parsers.common.record.RecordMetaData; + +import java.util.Collections; +import java.util.Map; class TextParsingContext implements ParsingContext { private final TextInput input; private final TextOutput output; - protected boolean stopped; - private int[] extractedIndexes; + private boolean stopped; - public TextParsingContext(TextInput input, TextOutput output) { + TextParsingContext(TextInput input, TextOutput output) { this.input = input; this.output = output; } - /** - * {@inheritDoc} - */ + public boolean isFull() { + return output.isFull(); + } + + public void stop(boolean stopped) { + this.stopped = stopped; + } + @Override public void stop() { stopped = true; } - /** - * {@inheritDoc} - */ @Override public boolean isStopped() { return stopped; } - /** - * {@inheritDoc} - */ + @Override + public int errorContentLength() { + return -1; + } + + @Override + public Record toRecord(String[] row) { + return null; + } + + @Override + public RecordMetaData recordMetaData() { + return null; + } + @Override public long currentLine() { return input.lineCount(); } - /** - * {@inheritDoc} - */ @Override public long currentChar() { return input.charCount(); } - /** - * {@inheritDoc} - */ + @Override + public void skipLines(long lines) { + } + + @Override + public String[] parsedHeaders() { + return new String[0]; + } + @Override public int currentColumn() { return -1; } - /** - * {@inheritDoc} - */ @Override public String[] headers() { - return new String[]{}; + return new String[0]; + } + + @Override + public String[] selectedHeaders() { + return new String[0]; } - /** - * {@inheritDoc} - */ @Override public int[] extractedFieldIndexes() { - return extractedIndexes; + return new int[0]; } - /** - * {@inheritDoc} - */ @Override public long currentRecord() { return output.getRecordCount(); } - /** - * {@inheritDoc} - */ @Override public String currentParsedContent() { - try { - return input.getStringSinceMarkForError(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return input.getStringSinceMarkForError(); + } + + @Override + public int currentParsedContentLength() { + return input.getStringSinceMarkForError().toCharArray().length; + } + + @Override + public String fieldContentOnError() { + return null; + } + + @Override + public Map<Long, String> comments() { + return Collections.emptyMap(); } @Override - public void skipLines(int lines) { + public String lastComment() { + return null; + } + + @Override + public char[] lineSeparator() { + return new char[0]; } @Override @@ -119,8 +148,14 @@ class TextParsingContext implements ParsingContext { return false; } - public boolean isFull() { - return output.isFull(); + @Override + public int indexOf(String header) { + return -1; + } + + @Override + public int indexOf(Enum<?> header) { + return -1; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java index a91a8ab..acc1cda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java @@ -33,7 +33,7 @@ public class TextParsingSettings { private final byte delimiter; private final byte comment; - private final long maxCharsPerColumn = Character.MAX_VALUE; + private final long maxCharsPerColumn = TextFormatPlugin.MAX_CHARS_PER_COLUMN; private final byte normalizedNewLine = b('\n'); private final byte[] newLineDelimiter; private final boolean ignoreLeadingWhitespaces = false; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java index 0ce856e..96d62f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.store.easy.text.reader; -import java.io.IOException; - -import org.apache.drill.common.exceptions.UserException; - import com.univocity.parsers.common.TextParsingException; - import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; /******************************************************************************* * Portions Copyright 2014 uniVocity Software Pty Ltd @@ -34,11 +35,10 @@ import io.netty.buffer.DrillBuf; * DrillBuf support. */ public final class TextReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class); - private static final byte NULL_BYTE = (byte) '\0'; + private static final Logger logger = LoggerFactory.getLogger(TextReader.class); - public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + private static final byte NULL_BYTE = (byte) TextFormatPlugin.NULL_CHAR; private final TextParsingContext context; @@ -102,7 +102,7 @@ public final class TextReader { * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed * we have an additional check to make sure its not negative */ - static final boolean isWhite(byte b){ + static boolean isWhite(byte b){ return b <= ' ' && b > -1; } @@ -252,12 +252,15 @@ public final class TextReader { prev = NULL_BYTE; } else { prev = ch; + // read next char taking into account it can be new line indicator + // to ensure that custom new line will be replaced with normalized one + ch = input.nextChar(); + continue; } } else { if (prev == quoteEscape) { output.append(prev); - } - else if (prev == quote) { // unescaped quote detected + } else if (prev == quote) { // unescaped quote detected if (parseUnescapedQuotes) { output.append(prev); break; @@ -326,7 +329,7 @@ public final class TextReader { * @return true if more rows can be read, false if not * @throws IOException for input file read errors */ - private final boolean parseField() throws IOException { + private boolean parseField() throws IOException { output.startField(fieldIndex++); @@ -375,7 +378,7 @@ public final class TextReader { * @throws IOException for input file read errors */ public final void start() throws IOException { - context.stopped = false; + context.stop(false); input.start(); } @@ -386,7 +389,7 @@ public final class TextReader { */ public final boolean parseNext() throws IOException { try { - while (! context.stopped) { + while (!context.isStopped()) { ch = input.nextChar(); if (ch == comment) { input.skipLines(1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java index a114fd4..fa1d62f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java @@ -17,44 +17,49 @@ */ package org.apache.drill.exec.store.easy.text.writer; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.List; -import java.util.Map; - +import com.univocity.parsers.csv.CsvFormat; +import com.univocity.parsers.csv.CsvWriter; +import com.univocity.parsers.csv.CsvWriterSettings; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.StorageStrategy; import org.apache.drill.exec.store.StringOutputRecordWriter; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; import org.apache.drill.exec.vector.complex.reader.FieldReader; -import org.apache.drill.shaded.guava.com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class TextRecordWriter extends StringOutputRecordWriter { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextRecordWriter.class); + + private static final Logger logger = LoggerFactory.getLogger(TextRecordWriter.class); private final StorageStrategy storageStrategy; + private final Configuration fsConf; + private FileSystem fs; private Path cleanUpLocation; - private String location; private String prefix; - - private String fieldDelimiter; private String extension; + // indicates number of a file created by this writer: 0_0_{fileNumberIndex}.csv (ex: 0_0_0.csv) + private int fileNumberIndex; - private int index; - private PrintStream stream = null; - private FileSystem fs = null; - - // Record write status - private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called - private StringBuilder currentRecord; // contains the current record separated by field delimiter + private CsvWriterSettings writerSettings; + private CsvWriter writer; - private Configuration fsConf; + // record write status: true once the startRecord() is called until endRecord() is called + private boolean fRecordStarted = false; public TextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) { super(allocator); @@ -66,24 +71,38 @@ public class TextRecordWriter extends StringOutputRecordWriter { public void init(Map<String, String> writerOptions) throws IOException { this.location = writerOptions.get("location"); this.prefix = writerOptions.get("prefix"); - this.fieldDelimiter = writerOptions.get("separator"); - this.extension = writerOptions.get("extension"); this.fs = FileSystem.get(fsConf); - - this.currentRecord = new StringBuilder(); - this.index = 0; + String extension = writerOptions.get("extension"); + this.extension = extension == null ? "" : "." + extension; + this.fileNumberIndex = 0; + + CsvWriterSettings writerSettings = new CsvWriterSettings(); + writerSettings.setMaxColumns(TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS); + writerSettings.setMaxCharsPerColumn(TextFormatPlugin.MAX_CHARS_PER_COLUMN); + writerSettings.setHeaderWritingEnabled(Boolean.parseBoolean(writerOptions.get("addHeader"))); + writerSettings.setQuoteAllFields(Boolean.parseBoolean(writerOptions.get("forceQuotes"))); + CsvFormat format = writerSettings.getFormat(); + format.setLineSeparator(writerOptions.get("lineSeparator")); + format.setDelimiter(writerOptions.get("fieldDelimiter")); + format.setQuote(writerOptions.get("quote").charAt(0)); + format.setQuoteEscape(writerOptions.get("escape").charAt(0)); + format.setCharToEscapeQuoteEscaping(TextFormatPlugin.NULL_CHAR); // do not escape "escape" char + + this.writerSettings = writerSettings; + + logger.trace("Text writer settings: {}", this.writerSettings); } @Override - public void startNewSchema(List<String> columnNames) throws IOException { + public void startNewSchema(BatchSchema schema) throws IOException { // wrap up the current file cleanup(); // open a new file for writing data with new schema - Path fileName = new Path(location, prefix + "_" + index + "." + extension); + Path fileName = new Path(location, String.format("%s_%s%s", prefix, fileNumberIndex, extension)); try { - // drill text writer does not support partitions, so only one file can be created + // Drill text writer does not support partitions, so only one file can be created // and thus only one location should be deleted in case of abort // to ensure that our writer was the first to create output file, // we create empty output file first and fail if file exists @@ -93,21 +112,26 @@ public class TextRecordWriter extends StringOutputRecordWriter { // we need to re-apply file permission DataOutputStream fos = fs.create(fileName); storageStrategy.applyToFile(fs, fileName); + logger.debug("Created file: {}.", fileName); - stream = new PrintStream(fos); - logger.debug("Created file: {}", fileName); - } catch (IOException ex) { - logger.error("Unable to create file: " + fileName, ex); - throw ex; + // increment file number index + fileNumberIndex++; + + this.writer = new CsvWriter(fos, writerSettings); + } catch (IOException e) { + throw new IOException(String.format("Unable to create file: %s.", fileName), e); } - index++; - stream.println(Joiner.on(fieldDelimiter).join(columnNames)); + if (writerSettings.isHeaderWritingEnabled()) { + writer.writeHeaders(StreamSupport.stream(schema.spliterator(), false) + .map(MaterializedField::getName) + .collect(Collectors.toList())); + } } @Override - public void addField(int fieldId, String value) throws IOException { - currentRecord.append(value + fieldDelimiter); + public void addField(int fieldId, String value) { + writer.addValue(value); } @Override @@ -115,7 +139,6 @@ public class TextRecordWriter extends StringOutputRecordWriter { if (fRecordStarted) { throw new IOException("Previous record is not written completely"); } - fRecordStarted = true; } @@ -125,13 +148,7 @@ public class TextRecordWriter extends StringOutputRecordWriter { throw new IOException("No record is in writing"); } - // remove the extra delimiter at the end - currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length()); - - stream.println(currentRecord.toString()); - - // reset current record status - currentRecord.delete(0, currentRecord.length()); + writer.writeValuesToRow(); fRecordStarted = false; } @@ -164,11 +181,16 @@ public class TextRecordWriter extends StringOutputRecordWriter { @Override public void cleanup() throws IOException { - super.cleanup(); - if (stream != null) { - stream.close(); - stream = null; - logger.debug("closing file"); + fRecordStarted = false; + if (writer != null) { + try { + writer.close(); + writer = null; + logger.debug("Closed text writer for file: {}.", cleanUpLocation); + } catch (IllegalStateException e) { + throw new IOException(String.format("Unable to close text writer for file %s: %s", + cleanUpLocation, e.getMessage()), e); + } } } @@ -180,5 +202,4 @@ public class TextRecordWriter extends StringOutputRecordWriter { cleanUpLocation.toUri().getPath(), fs.getUri()); } } - } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 7dfafd3..b276389 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -660,6 +660,8 @@ drill.exec.options: { store.table.use_schema_file: false, store.partition.hash_distribute: false, store.text.estimated_row_size_bytes: 100.0, + store.text.writer.add_header: true, + store.text.writer.force_quotes: false, store.kafka.all_text_mode: false, store.kafka.read_numbers_as_double: false, store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader", diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java new file mode 100644 index 0000000..6ad38bc --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.writer; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; +import static org.junit.Assert.assertEquals; + +public class TestTextWriter extends ClusterTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private static final List<String> tablesToDrop = new ArrayList<>(); + + @BeforeClass + public static void setup() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); + startCluster(builder); + + Map<String, FormatPluginConfig> formats = new HashMap<>(); + + TextFormatConfig csv = new TextFormatConfig(); + csv.extensions = Collections.singletonList("csv"); + csv.lineDelimiter = "\n"; + csv.fieldDelimiter = ','; + csv.quote = '"'; + csv.escape = '"'; + csv.extractHeader = true; + formats.put("csv", csv); + + TextFormatConfig tsv = new TextFormatConfig(); + tsv.extensions = Collections.singletonList("tsv"); + tsv.lineDelimiter = "\n"; + tsv.fieldDelimiter = '\t'; + tsv.quote = '"'; + tsv.escape = '"'; + tsv.extractHeader = true; + formats.put("tsv", tsv); + + TextFormatConfig custom = new TextFormatConfig(); + custom.extensions = Collections.singletonList("custom"); + custom.lineDelimiter = "!"; + custom.fieldDelimiter = '_'; + custom.quote = '$'; + custom.escape = '^'; + custom.extractHeader = true; + formats.put("custom", custom); + + cluster.defineFormats("dfs", formats); + } + + @After + public void cleanUp() { + client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION); + client.resetSession(ExecConstants.TEXT_WRITER_ADD_HEADER); + client.resetSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES); + + tablesToDrop.forEach( + table -> client.runSqlSilently(String.format("drop table if exists %s", table))); + } + + @Test + public void testWithHeaders() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + + String tableName = "csv_with_headers_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Arrays.asList("col1,col2", "a,b"), lines); + } + + @Test + public void testWithoutHeaders() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + client.alterSession(ExecConstants.TEXT_WRITER_ADD_HEADER, false); + + String tableName = "csv_without_headers_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Collections.singletonList("a,b"), lines); + } + + @Test + public void testNoQuotes() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + + String tableName = "csv_no_quotes_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as " + + "select 1 as id, 'Bob' as name, 'A B C' as desc from (values(1))", fullTableName).run(); + + testBuilder() + .sqlQuery("select * from %s", fullTableName) + .unOrdered() + .baselineColumns("id", "name", "desc") + .baselineValues("1", "Bob", "A B C") + .go(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Arrays.asList("id,name,desc", "1,Bob,A B C"), lines); + } + + @Test + public void testQuotesOnDemand() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + + String tableName = "csv_quotes_on_demand_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as " + + "select 1 as id, 'Bob\nSmith' as name, 'A,B,C' as desc from (values(1))", fullTableName).run(); + + testBuilder() + .sqlQuery("select * from %s", fullTableName) + .unOrdered() + .baselineColumns("id", "name", "desc") + .baselineValues("1", "Bob\nSmith", "A,B,C") + .go(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Arrays.asList("id,name,desc", "1,\"Bob", "Smith\",\"A,B,C\""), lines); + } + + @Test + public void testForceQuotes() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + client.alterSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES, true); + + String tableName = "csv_force_quotes_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as " + + "select 1 as id, 'Bob' as name, 'A,B,C' as desc from (values(1))", fullTableName).run(); + + testBuilder() + .sqlQuery("select * from %s", fullTableName) + .unOrdered() + .baselineColumns("id", "name", "desc") + .baselineValues("1", "Bob", "A,B,C") + .go(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Arrays.asList("\"id\",\"name\",\"desc\"", "\"1\",\"Bob\",\"A,B,C\""), lines); + } + + @Test + public void testTsv() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "tsv"); + + String tableName = "tsv_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as " + + "select 1 as id, 'Bob\tSmith' as name, 'A\"B\"C' as desc from (values(1))", fullTableName).run(); + + testBuilder() + .sqlQuery("select * from %s", fullTableName) + .unOrdered() + .baselineColumns("id", "name", "desc") + .baselineValues("1", "Bob\tSmith", "A\"B\"C") + .go(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.tsv"); + List<String> lines = Files.readAllLines(path); + assertEquals(Arrays.asList("id\tname\tdesc", "1\t\"Bob\tSmith\"\tA\"B\"C"), lines); + } + + @Test + public void testCustomFormat() throws Exception { + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "custom"); + + String tableName = "custom_format_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + queryBuilder().sql("create table %s as " + + "select 1 as `id_`, 'Bob$Smith' as name, 'A^B!C' as desc from (values(1))", fullTableName).run(); + + testBuilder() + .sqlQuery("select * from %s", fullTableName) + .unOrdered() + .baselineColumns("id_", "name", "desc") + .baselineValues("1", "Bob$Smith", "A^B!C") + .go(); + + Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.custom"); + List<String> lines = Files.readAllLines(path); + assertEquals(Collections.singletonList("$id_$_name_desc!1_Bob$Smith_$A^B!C$!"), lines); + } + + @Test + public void testLineDelimiterLengthLimit() throws Exception { + TextFormatConfig incorrect = new TextFormatConfig(); + incorrect.lineDelimiter = "end"; + cluster.defineFormat("dfs", "incorrect", incorrect); + + client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "incorrect"); + + String tableName = "incorrect_line_delimiter_table"; + String fullTableName = String.format("dfs.tmp.`%s`", tableName); + tablesToDrop.add(fullTableName); + + // univocity-parsers allow only 1 - 2 characters line separators + thrown.expect(UserException.class); + thrown.expectMessage("Invalid line separator"); + + queryBuilder().sql("create table %s as select 1 as id from (values(1))", fullTableName).run(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java index 67ca4c1..a22d11c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java @@ -17,25 +17,26 @@ */ package org.apache.drill.exec.store.easy.text.compliant; -import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; - -import java.io.File; -import java.io.FileWriter; -import java.io.PrintWriter; - import org.apache.drill.TestSelectWithOption; import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.easy.text.TextFormatPlugin; -import org.apache.drill.exec.physical.rowSet.RowSet; -import org.apache.drill.exec.physical.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetUtilities; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; + +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; +import static org.junit.Assert.assertTrue; + /** * Test table properties with the compliant text reader. The * table properties override selected properties in the format @@ -49,7 +50,7 @@ import org.junit.experimental.categories.Category; * using that schema rather than using the "columns" array * column. * - * @see {@link TestSelectWithOption} for similar tests using table + * @see TestSelectWithOption for similar tests using table * properties within SQL */ @@ -89,9 +90,9 @@ public class TestCsvTableProperties extends BaseCsvTest { .build(); } - public static String SELECT_ALL = "SELECT * FROM %s"; + private static final String SELECT_ALL = "SELECT * FROM %s"; - private static String noHeaders[] = { + private static final String[] noHeaders = { "10,fred", "20,wilma" }; @@ -122,7 +123,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String extraCols[] = { + private static final String[] extraCols = { "10,fred,23.45", "20,wilma,1234.56,vip" }; @@ -148,7 +149,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String skipHeaders[] = { + private static final String[] skipHeaders = { "ignore,me", "10,fred", "20,wilma" @@ -180,7 +181,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String withHeaders[] = { + private static final String[] withHeaders = { "id, name", "10,fred", "20,wilma" @@ -220,7 +221,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String barDelim[] = { + private static final String[] barDelim = { "10|fred", "20|wilma" }; @@ -241,7 +242,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String customCommentChar[] = { + private static final String[] customCommentChar = { "@Comment", "#10,fred", "#20,wilma" @@ -273,7 +274,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String noCommentChar[] = { + private static final String[] noCommentChar = { "#10,fred", "#20,wilma" }; @@ -301,7 +302,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String quotesData[] = { + private static final String[] quotesData = { "1,@foo@", "2,@foo~@bar@", @@ -342,7 +343,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String doubleQuotesData[] = { + private static final String[] doubleQuotesData = { "1,@foo@", "2,@foo@@bar@", }; @@ -380,7 +381,38 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String specialCharsData[] = { + private static final String[] quotesAndCustomNewLineData = { + "1,@foo@!2,@foo@@bar@!", + }; + + @Test + public void testQuotesAndCustomNewLine() throws Exception { + try { + enableSchemaSupport(); + String tablePath = buildTable("quotesAndCustomNewLine", quotesAndCustomNewLineData); + String sql = "create schema () " + + "for table " + tablePath + " PROPERTIES ('" + + TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" + + TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" + + TextFormatPlugin.LINE_DELIM_PROP + "'='!', '" + + TextFormatPlugin.QUOTE_PROP + "'='@', '" + + TextFormatPlugin.QUOTE_ESCAPE_PROP + "'='@')"; + run(sql); + RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("1", "foo")) + .addSingleCol(strArray("2", "foo@bar")) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetSchemaSupport(); + } + } + + private static final String[] specialCharsData = { "10\u0001'fred'", "20\u0001'wilma'" }; @@ -428,7 +460,7 @@ public class TestCsvTableProperties extends BaseCsvTest { enableSchemaSupport(); String tableName = "newline"; File rootDir = new File(testDir, tableName); - rootDir.mkdir(); + assertTrue(rootDir.mkdir()); try(PrintWriter out = new PrintWriter(new FileWriter(new File(rootDir, ROOT_FILE)))) { out.print("1,fred\r2,wilma\r"); } @@ -453,7 +485,7 @@ public class TestCsvTableProperties extends BaseCsvTest { } } - private static String messyQuotesData[] = { + private static final String[] messyQuotesData = { "first\"field\"here,another \"field", "end quote\",another\"", "many\"\"\"\",more\"\"", diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index a26cf2f..91584dd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -17,24 +17,6 @@ */ package org.apache.drill.test; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URL; -import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import org.apache.drill.exec.store.SchemaFactory; -import org.apache.drill.test.DrillTestWrapper.TestServices; import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.FormatPluginConfig; @@ -48,6 +30,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.store.SchemaFactory; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistryImpl; import org.apache.drill.exec.store.dfs.FileSystemConfig; @@ -57,10 +40,27 @@ import org.apache.drill.exec.store.mock.MockStorageEngine; import org.apache.drill.exec.store.mock.MockStorageEngineConfig; import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; import org.apache.drill.exec.util.StoragePluginTestUtils; - import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.shaded.guava.com.google.common.io.Resources; +import org.apache.drill.test.DrillTestWrapper.TestServices; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA; import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA; @@ -416,6 +416,10 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } } zkHelper = null; + + if (ex != null) { + throw ex; + } } /** @@ -487,7 +491,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } } - public static void defineWorkspace(Drillbit drillbit, String pluginName, + private void defineWorkspace(Drillbit drillbit, String pluginName, String schemaName, String path, String defaultFormat, FormatPluginConfig format) throws ExecutionSetupException { final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage(); @@ -500,20 +504,56 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { .ifPresent(newWorkspaces::putAll); newWorkspaces.put(schemaName, newTmpWSConfig); - Map<String, FormatPluginConfig> newFormats = new HashMap<>(pluginConfig.getFormats()); + Map<String, FormatPluginConfig> newFormats = new HashMap<>(); Optional.ofNullable(pluginConfig.getFormats()) .ifPresent(newFormats::putAll); Optional.ofNullable(format) .ifPresent(f -> newFormats.put(defaultFormat, f)); + updatePlugin(pluginRegistry, pluginName, pluginConfig, newWorkspaces, newFormats); + } + + public void defineFormat(String pluginName, String name, FormatPluginConfig config) { + defineFormats(pluginName, ImmutableMap.of(name, config)); + } + + public void defineFormats(String pluginName, Map<String, FormatPluginConfig> formats) { + for (Drillbit bit : drillbits()) { + try { + defineFormats(bit, pluginName, formats); + } catch (ExecutionSetupException e) { + throw new IllegalStateException(e); + } + } + } + + private void defineFormats(Drillbit drillbit, + String pluginName, + Map<String, FormatPluginConfig> formats) throws ExecutionSetupException { + StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage(); + FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName); + FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig(); + + Map<String, FormatPluginConfig> newFormats = new HashMap<>(); + Optional.ofNullable(pluginConfig.getFormats()) + .ifPresent(newFormats::putAll); + newFormats.putAll(formats); + + updatePlugin(pluginRegistry, pluginName, pluginConfig, null, newFormats); + } + + private void updatePlugin(StoragePluginRegistry pluginRegistry, + String pluginName, + FileSystemConfig pluginConfig, + Map<String, WorkspaceConfig> newWorkspaces, + Map<String, FormatPluginConfig> newFormats) throws ExecutionSetupException { FileSystemConfig newPluginConfig = new FileSystemConfig( - pluginConfig.getConnection(), - pluginConfig.getConfig(), - newWorkspaces, - newFormats); + pluginConfig.getConnection(), + pluginConfig.getConfig(), + newWorkspaces == null ? pluginConfig.getWorkspaces() : newWorkspaces, + newFormats == null ? pluginConfig.getFormats() : newFormats); newPluginConfig.setEnabled(pluginConfig.isEnabled()); - pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true); } @@ -580,8 +620,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * Return a cluster fixture built with standard options. This is a short-cut * for simple tests that don't need special setup. * + * @param dirTestWatcher directory test watcher * @return a cluster fixture with standard options - * @throws Exception if something goes wrong */ public static ClusterFixture standardCluster(BaseDirTestWatcher dirTestWatcher) { return builder(dirTestWatcher).build();