[
https://issues.apache.org/jira/browse/DRILL-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390244#comment-17390244
]
ASF GitHub Bot commented on DRILL-7978:
---------------------------------------
cgivre commented on a change in pull request #2282:
URL: https://github.com/apache/drill/pull/2282#discussion_r679426218
##########
File path: contrib/format-fixedwidth/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
Review comment:
Nit: Can you please reformat with 2 space indentation.
##########
File path:
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ FixedwidthFormatConfig formatConfig = new
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+ Lists.newArrayList(
+ new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "",
1, 4),
+ new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter",
"", 6, 4),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+ ));
+ cluster.defineFormat("cp", "fwf", formatConfig);
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav`
WHERE d16=4";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+ .addRow(66.0, 1.0, "Urban")
+ .build();
+
+ assertEquals(3, results.rowCount());
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testBatchReader() throws Exception {
+ String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Number", TypeProtos.MinorType.INT)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .addNullable("Date", TypeProtos.MinorType.DATE)
+ .addNullable("Time",TypeProtos.MinorType.TIME)
+ .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"),
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+ .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"),
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+ .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"),
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+ .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"),
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+ .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"),
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+ .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"),
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+ .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"),
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+ .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"),
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+ .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"),
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .build();
+
+ System.out.println(expected);
+ assertEquals(25, results.rowCount());
+
+ //System.out.println(results.batchSchema());
Review comment:
Please remove any calls to `System.out.println` in unit tests.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ return (line != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ fsStream.close();
+ loader.close();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to close input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata buildSchema(){
+ SchemaBuilder builder = new SchemaBuilder();
+
+ for (FixedwidthFieldConfig field : config.getFields()){
+ builder.addNullable(field.getFieldName(),field.getDataType());
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void parseLine(String line, RowSetLoader writer) {
+ int i = 0;
+ TypeProtos.MinorType dataType;
+ String dateTimeFormat;
+ String value;
+
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ value = line.substring(field.getStartIndex() - 1, field.getStartIndex()
+ field.getFieldWidth() - 1);
+
+ dataType = field.getDataType();
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+ switch (dataType) {
+ case INT:
+ writer.scalar(i).setInt(Integer.parseInt(value));
+ break;
+ case VARCHAR:
+ writer.scalar(i).setString(value);
+ break;
+ case DATE:
+ LocalDate date = LocalDate.parse(value, formatter);
+ writer.scalar(i).setDate(date);
+ break;
+ case TIME:
+ LocalTime time = LocalTime.parse(value, formatter);
+ writer.scalar(i).setTime(time);
+ break;
+ case TIMESTAMP:
+ LocalDateTime ldt = LocalDateTime.parse(value,formatter);
+ ZoneId z = ZoneId.of( "America/Toronto" );
+ ZonedDateTime zdt = ldt.atZone( z );
+ Instant timeStamp = zdt.toInstant();
+ writer.scalar(i).setTimestamp(timeStamp);
+ break;
+ default:
+ throw new RuntimeException("Unknown data type specified in fixed
width. Found data type " + dataType);
+
Review comment:
Nit: Please remove excess whitespace here and elsewhere.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ return (line != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ fsStream.close();
+ loader.close();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to close input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata buildSchema(){
+ SchemaBuilder builder = new SchemaBuilder();
+
+ for (FixedwidthFieldConfig field : config.getFields()){
+ builder.addNullable(field.getFieldName(),field.getDataType());
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void parseLine(String line, RowSetLoader writer) {
+ int i = 0;
+ TypeProtos.MinorType dataType;
+ String dateTimeFormat;
+ String value;
+
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ value = line.substring(field.getStartIndex() - 1, field.getStartIndex()
+ field.getFieldWidth() - 1);
+
+ dataType = field.getDataType();
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+ switch (dataType) {
+ case INT:
Review comment:
It looks like we are only supporting INT, VARCHAR, DATE, TIME and
TIMESTAMP. Do we want to support a few other data types such as `LONG` or
`DOUBLE`?
##########
File path:
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ FixedwidthFormatConfig formatConfig = new
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+ Lists.newArrayList(
+ new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "",
1, 4),
+ new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter",
"", 6, 4),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+ ));
+ cluster.defineFormat("cp", "fwf", formatConfig);
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav`
WHERE d16=4";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+ .addRow(66.0, 1.0, "Urban")
+ .build();
+
+ assertEquals(3, results.rowCount());
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testBatchReader() throws Exception {
+ String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Number", TypeProtos.MinorType.INT)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .addNullable("Date", TypeProtos.MinorType.DATE)
+ .addNullable("Time",TypeProtos.MinorType.TIME)
+ .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"),
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+ .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"),
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+ .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"),
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+ .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"),
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+ .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"),
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+ .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"),
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+ .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"),
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+ .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"),
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+ .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"),
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .build();
+
+ System.out.println(expected);
+ assertEquals(25, results.rowCount());
+
+ //System.out.println(results.batchSchema());
+ System.out.println(results);
+
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ System.out.println("Test complete.");
Review comment:
These last two lines are not necessary.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ return (line != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ fsStream.close();
+ loader.close();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to close input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata buildSchema(){
+ SchemaBuilder builder = new SchemaBuilder();
+
+ for (FixedwidthFieldConfig field : config.getFields()){
+ builder.addNullable(field.getFieldName(),field.getDataType());
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void parseLine(String line, RowSetLoader writer) {
Review comment:
See comment above but I'd recommend making this method return a boolean
value.
##########
File path:
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ FixedwidthFormatConfig formatConfig = new
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+ Lists.newArrayList(
+ new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "",
1, 4),
+ new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter",
"", 6, 4),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+ new
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+ ));
+ cluster.defineFormat("cp", "fwf", formatConfig);
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav`
WHERE d16=4";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+ .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+ .addRow(66.0, 1.0, "Urban")
+ .build();
+
+ assertEquals(3, results.rowCount());
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testBatchReader() throws Exception {
+ String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("Number", TypeProtos.MinorType.INT)
+ .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+ .addNullable("Address", TypeProtos.MinorType.INT)
+ .addNullable("Date", TypeProtos.MinorType.DATE)
+ .addNullable("Time",TypeProtos.MinorType.TIME)
+ .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+ .buildSchema();
+
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"),
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+ .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"),
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+ .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"),
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+ .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"),
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+ .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"),
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+ .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"),
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+ .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"),
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+ .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"),
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+ .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"),
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"),
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+ .build();
+
+ System.out.println(expected);
+ assertEquals(25, results.rowCount());
+
+ //System.out.println(results.batchSchema());
+ System.out.println(results);
+
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ System.out.println("Test complete.");
Review comment:
Please add tests for:
* Serialization/Deserialization
* Compressed file
* Various invalid schemata. For instance, what happens if you don't have
fields defined in the config and try to query the data?
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;
Review comment:
Please add Apache license to all files.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
Review comment:
There is a small issue with the `next()` function as written. You define
`maxRecords` but don't do anything with it. The `maxRecords` is the `LIMIT`
which gets pushed down from the query. The idea being that if a user does a
`SELECT ... LIMIT 10` your reader should stop reading as soon as that limit is
reached. Which means that the `next` function should return `false` when the
limit has been reached. The good news is that your `writer` object actually
has a method called `limitReached(<maxRecords>)` which will track this for you.
Another thing you might consider doing which would clean up the code a bit
would be to make the `parseLine` method return `true` if there is more data to
read, `false` if not. Also move the writer start and end to that method, then
you could have a `next` method that looks like this:
```java
@Override
public boolean next() {
recordCount = 0;
while (!writer.isFull()) {
if (!parseLine(writer)) {
return false;
}
}
return true;
}
```
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.types.TypeProtos;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Review comment:
For all serializable classes in Drill, I would recommend overriding
`equals`, `hashcode`, and `toString()` methods.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ return (line != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ fsStream.close();
+ loader.close();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to close input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ }
+
+ private TupleMetadata buildSchema(){
+ SchemaBuilder builder = new SchemaBuilder();
+
+ for (FixedwidthFieldConfig field : config.getFields()){
+ builder.addNullable(field.getFieldName(),field.getDataType());
+ }
+
+ return builder.buildSchema();
+ }
+
+ private void parseLine(String line, RowSetLoader writer) {
+ int i = 0;
+ TypeProtos.MinorType dataType;
+ String dateTimeFormat;
+ String value;
+
+ for (FixedwidthFieldConfig field : config.getFields()) {
+ value = line.substring(field.getStartIndex() - 1, field.getStartIndex()
+ field.getFieldWidth() - 1);
+
+ dataType = field.getDataType();
+ dateTimeFormat = field.getDateTimeFormat();
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+ switch (dataType) {
+ case INT:
+ writer.scalar(i).setInt(Integer.parseInt(value));
+ break;
+ case VARCHAR:
+ writer.scalar(i).setString(value);
+ break;
+ case DATE:
+ LocalDate date = LocalDate.parse(value, formatter);
+ writer.scalar(i).setDate(date);
+ break;
+ case TIME:
+ LocalTime time = LocalTime.parse(value, formatter);
+ writer.scalar(i).setTime(time);
+ break;
+ case TIMESTAMP:
+ LocalDateTime ldt = LocalDateTime.parse(value,formatter);
+ ZoneId z = ZoneId.of( "America/Toronto" );
+ ZonedDateTime zdt = ldt.atZone( z );
+ Instant timeStamp = zdt.toInstant();
+ writer.scalar(i).setTimestamp(timeStamp);
+ break;
+ default:
+ throw new RuntimeException("Unknown data type specified in fixed
width. Found data type " + dataType);
Review comment:
Please convert this to a `UserException`.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
Review comment:
I'd suggest tightening this a bit. Instead of capturing a generic
`Exception`, perhaps use specific exceptions with different error messages.
Alternatively just put the try/catch around the line(s) in which data is being
read.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedwidthFormatPlugin extends
EasyFormatPlugin<FixedwidthFormatConfig> {
+
+ protected static final String DEFAULT_NAME = "fixedwidth";
+
+ private static class FixedwidthReaderFactory extends FileReaderFactory {
+
+ private final FixedwidthFormatConfig config;
+ private final int maxRecords;
+
+ public FixedwidthReaderFactory(FixedwidthFormatConfig config, int
maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new FixedwidthBatchReader(config, maxRecords);
+ }
+ }
+
+ public FixedwidthFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ FixedwidthFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
+ } //final?
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
FixedwidthFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false)
Review comment:
I think this actually might be blocksplittable.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.types.TypeProtos;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFieldConfig {
+
+ private final TypeProtos.MinorType dataType;
+ private final String fieldName;
+ private final String dateTimeFormat;
+ private final int startIndex;
+ private final int fieldWidth;
+
+ public FixedwidthFieldConfig(@JsonProperty("dataType") TypeProtos.MinorType
dataType,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("dateTimeFormat") String
dateTimeFormat,
+ @JsonProperty("startIndex") int startIndex,
+ @JsonProperty("fieldWidth") int fieldWidth) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.dateTimeFormat = dateTimeFormat;
+ this.startIndex = startIndex;
+ this.fieldWidth = fieldWidth;
+ }
+
+ public TypeProtos.MinorType getDataType(){
+ return dataType;
+ }
+
+// public void setDataType(TypeProtos.MinorType dataType){
Review comment:
Please remove struts here and elsewhere.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
Review comment:
Nit: Please remove extra whitespace, here and elsewhere.
##########
File path:
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements
ManagedReader<FileSchemaNegotiator>{
+
+ private static final Logger logger =
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+ private FileSplit split;
+
+ private final int maxRecords;
+
+ private final FixedwidthFormatConfig config;
+
+ private CustomErrorContext errorContext;
+
+ private InputStream fsStream;
+
+ private ResultSetLoader loader;
+
+ private BufferedReader reader;
+
+ public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+ this.config = config;
+ this.maxRecords = maxRecords;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ errorContext = negotiator.parentErrorContext();
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ negotiator.tableSchema(buildSchema(),true);
+ loader = negotiator.build();
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+
+ reader = new BufferedReader(new InputStreamReader(fsStream,
Charsets.UTF_8));
+
+ return true;
+
+ }
+
+ @Override
+ public boolean next() { // Use loader to read data from file to turn into
Drill rows
+
+ String line;
+
+ try {
+ line = reader.readLine();
+ RowSetLoader writer = loader.writer();
+
+ while (!writer.isFull() && line != null) {
+
+ writer.start();
+ parseLine(line, writer);
+ writer.save();
+
+ line = reader.readLine();
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to read input file: {}",
split.getPath().toString())
+ .addContext(errorContext)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ return (line != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ fsStream.close();
Review comment:
Consider using `Autoclosables` here. That will do all the error
handling for you.
```java
@Override
public void close() {
if (fsStream != null) {
AutoCloseables.closeSilently(fsStream);
fsStream = null;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Fixed Width Format Plugin
> -------------------------
>
> Key: DRILL-7978
> URL: https://issues.apache.org/jira/browse/DRILL-7978
> Project: Apache Drill
> Issue Type: New Feature
> Components: Storage - Other
> Reporter: Megan Foss
> Priority: Major
>
> Developing format plugin to parse fixed width files.
> Fixed Width Text File Definition:
> https://www.oracle.com/webfolder/technetwork/data-quality/edqhelp/Content/introduction/getting_started/configuring_fixed_width_text_file_formats.htm
--
This message was sent by Atlassian Jira
(v8.3.4#803005)