[ 
https://issues.apache.org/jira/browse/DRILL-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390244#comment-17390244
 ] 

ASF GitHub Bot commented on DRILL-7978:
---------------------------------------

cgivre commented on a change in pull request #2282:
URL: https://github.com/apache/drill/pull/2282#discussion_r679426218



##########
File path: contrib/format-fixedwidth/pom.xml
##########
@@ -0,0 +1,84 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

Review comment:
       Nit:  Can you please reformat with 2 space indentation. 

##########
File path: 
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    FixedwidthFormatConfig formatConfig = new 
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+            Lists.newArrayList(
+            new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "", 
1, 4),
+            new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter", 
"", 6, 4),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+    ));
+    cluster.defineFormat("cp", "fwf", formatConfig);
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+            .addRow(66.0, 1.0, "Urban")
+            .build();
+
+    assertEquals(3, results.rowCount());
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testBatchReader() throws Exception {
+    String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("Number", TypeProtos.MinorType.INT)
+            .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+            .addNullable("Address", TypeProtos.MinorType.INT)
+            .addNullable("Date", TypeProtos.MinorType.DATE)
+            .addNullable("Time",TypeProtos.MinorType.TIME)
+            .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"), 
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+            .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"), 
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+            .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"), 
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+            .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"), 
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+            .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"), 
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+            .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"), 
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+            .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"), 
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+            .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"), 
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+            .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"), 
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .build();
+
+    System.out.println(expected);
+    assertEquals(25, results.rowCount());
+
+    //System.out.println(results.batchSchema());

Review comment:
       Please remove any calls to `System.out.println` in unit tests. 

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to read input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+    return (line != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();
+      loader.close();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to close input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema(){
+    SchemaBuilder builder = new SchemaBuilder();
+
+    for (FixedwidthFieldConfig field : config.getFields()){
+      builder.addNullable(field.getFieldName(),field.getDataType());
+    }
+
+      return builder.buildSchema();
+  }
+
+  private void parseLine(String line, RowSetLoader writer) {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      value = line.substring(field.getStartIndex() - 1, field.getStartIndex() 
+ field.getFieldWidth() - 1);
+
+      dataType = field.getDataType();
+      dateTimeFormat = field.getDateTimeFormat();
+      DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+      switch (dataType) {
+        case INT:
+          writer.scalar(i).setInt(Integer.parseInt(value));
+          break;
+        case VARCHAR:
+          writer.scalar(i).setString(value);
+          break;
+        case DATE:
+          LocalDate date = LocalDate.parse(value, formatter);
+          writer.scalar(i).setDate(date);
+          break;
+        case TIME:
+          LocalTime time = LocalTime.parse(value, formatter);
+          writer.scalar(i).setTime(time);
+          break;
+        case TIMESTAMP:
+          LocalDateTime ldt = LocalDateTime.parse(value,formatter);
+          ZoneId z = ZoneId.of( "America/Toronto" );
+          ZonedDateTime zdt = ldt.atZone( z );
+          Instant timeStamp = zdt.toInstant();
+          writer.scalar(i).setTimestamp(timeStamp);
+          break;
+        default:
+          throw new RuntimeException("Unknown data type specified in fixed 
width. Found data type " + dataType);
+

Review comment:
       Nit: Please remove excess whitespace here and elsewhere.

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to read input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+    return (line != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();
+      loader.close();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to close input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema(){
+    SchemaBuilder builder = new SchemaBuilder();
+
+    for (FixedwidthFieldConfig field : config.getFields()){
+      builder.addNullable(field.getFieldName(),field.getDataType());
+    }
+
+      return builder.buildSchema();
+  }
+
+  private void parseLine(String line, RowSetLoader writer) {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      value = line.substring(field.getStartIndex() - 1, field.getStartIndex() 
+ field.getFieldWidth() - 1);
+
+      dataType = field.getDataType();
+      dateTimeFormat = field.getDateTimeFormat();
+      DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+      switch (dataType) {
+        case INT:

Review comment:
       It looks like we are only supporting INT, VARCHAR, DATE, TIME and 
TIMESTAMP.  Do we want to support a few other data types such as `LONG` or 
`DOUBLE`?

##########
File path: 
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    FixedwidthFormatConfig formatConfig = new 
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+            Lists.newArrayList(
+            new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "", 
1, 4),
+            new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter", 
"", 6, 4),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+    ));
+    cluster.defineFormat("cp", "fwf", formatConfig);
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+            .addRow(66.0, 1.0, "Urban")
+            .build();
+
+    assertEquals(3, results.rowCount());
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testBatchReader() throws Exception {
+    String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("Number", TypeProtos.MinorType.INT)
+            .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+            .addNullable("Address", TypeProtos.MinorType.INT)
+            .addNullable("Date", TypeProtos.MinorType.DATE)
+            .addNullable("Time",TypeProtos.MinorType.TIME)
+            .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"), 
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+            .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"), 
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+            .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"), 
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+            .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"), 
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+            .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"), 
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+            .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"), 
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+            .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"), 
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+            .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"), 
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+            .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"), 
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .build();
+
+    System.out.println(expected);
+    assertEquals(25, results.rowCount());
+
+    //System.out.println(results.batchSchema());
+    System.out.println(results);
+
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+    System.out.println("Test complete.");

Review comment:
       These last two lines are not necessary.

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to read input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+    return (line != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();
+      loader.close();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to close input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema(){
+    SchemaBuilder builder = new SchemaBuilder();
+
+    for (FixedwidthFieldConfig field : config.getFields()){
+      builder.addNullable(field.getFieldName(),field.getDataType());
+    }
+
+      return builder.buildSchema();
+  }
+
+  private void parseLine(String line, RowSetLoader writer) {

Review comment:
       See comment above but I'd recommend making this method return a boolean 
value. 

##########
File path: 
contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java
##########
@@ -0,0 +1,126 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(RowSetTests.class)
+public class TestFixedwidthRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    FixedwidthFormatConfig formatConfig = new 
FixedwidthFormatConfig(Lists.newArrayList("fwf"),
+            Lists.newArrayList(
+            new FixedwidthFieldConfig(TypeProtos.MinorType.INT, "Number", "", 
1, 4),
+            new FixedwidthFieldConfig(TypeProtos.MinorType.VARCHAR, "Letter", 
"", 6, 4),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.INT,"Address","",11,3),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.DATE,"Date","MM-dd-yyyy",15,10),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIME,"Time","HH:mm:ss",26,8),
+            new 
FixedwidthFieldConfig(TypeProtos.MinorType.TIMESTAMP,"DateTime","MM-dd-yyyy'T'HH:mm:ss.SSX",35,23)
+    ));
+    cluster.defineFormat("cp", "fwf", formatConfig);
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("fwf/"));
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "SELECT ID, Urban, Urban_value FROM dfs.`spss/testdata.sav` 
WHERE d16=4";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("ID", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban", TypeProtos.MinorType.FLOAT8)
+            .addNullable("Urban_value", TypeProtos.MinorType.VARCHAR)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(47.0, 1.0, "Urban").addRow(53.0, 1.0, "Urban")
+            .addRow(66.0, 1.0, "Urban")
+            .build();
+
+    assertEquals(3, results.rowCount());
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testBatchReader() throws Exception {
+    String sql = "SELECT * FROM cp.`fwf/test.fwf`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .addNullable("Number", TypeProtos.MinorType.INT)
+            .addNullable("Letter", TypeProtos.MinorType.VARCHAR)
+            .addNullable("Address", TypeProtos.MinorType.INT)
+            .addNullable("Date", TypeProtos.MinorType.DATE)
+            .addNullable("Time",TypeProtos.MinorType.TIME)
+            .addNullable("DateTime",TypeProtos.MinorType.TIMESTAMP)
+            .buildSchema();
+
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+            .addRow(1234, "test", 567, LocalDate.parse("2021-02-10"), 
LocalTime.parse("10:30:27"), Instant.parse("2021-02-10T15:30:27.00Z"))
+            .addRow(5678, "TEST", 890, LocalDate.parse("2021-07-27"), 
LocalTime.parse("12:40:15"), Instant.parse("2021-07-27T16:40:15.00Z"))
+            .addRow(1111, "abcd", 111, LocalDate.parse("1111-11-11"), 
LocalTime.parse("11:11:11"), Instant.parse("1111-11-11T16:28:43.11Z"))
+            .addRow(2222, "efgh", 222, LocalDate.parse("2222-01-22"), 
LocalTime.parse("22:22:22"), Instant.parse("2222-01-23T03:22:22.22Z"))
+            .addRow(3333, "ijkl", 333, LocalDate.parse("3333-02-01"), 
LocalTime.parse("01:33:33"), Instant.parse("3333-02-01T06:33:33.33Z"))
+            .addRow(4444, "mnop", 444, LocalDate.parse("4444-03-02"), 
LocalTime.parse("02:44:44"), Instant.parse("4444-03-02T07:44:44.44Z"))
+            .addRow(5555, "qrst", 555, LocalDate.parse("5555-04-03"), 
LocalTime.parse("03:55:55"), Instant.parse("5555-04-03T07:55:55.55Z"))
+            .addRow(6666, "uvwx", 666, LocalDate.parse("6666-05-04"), 
LocalTime.parse("04:01:01"), Instant.parse("6666-05-04T08:01:01.01Z"))
+            .addRow(7777, "yzzz", 777, LocalDate.parse("7777-06-05"), 
LocalTime.parse("05:11:11"), Instant.parse("7777-06-05T09:11:11.11Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .addRow(8888, "aabb", 888, LocalDate.parse("8888-07-06"), 
LocalTime.parse("06:22:22"), Instant.parse("8888-07-07T10:22:22.22Z"))
+            .build();
+
+    System.out.println(expected);
+    assertEquals(25, results.rowCount());
+
+    //System.out.println(results.batchSchema());
+    System.out.println(results);
+
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+    System.out.println("Test complete.");

Review comment:
       Please add tests for:
   
   * Serialization/Deserialization
   * Compressed file
   * Various invalid schemata.  For instance, what happens if you don't have 
fields defined in the config and try to query the data?

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;

Review comment:
       Please add Apache license to all files. 

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+

Review comment:
       
   There is a small issue with the `next()` function as written.  You define 
`maxRecords` but don't do anything with it.  The `maxRecords` is the `LIMIT` 
which gets pushed down from the query.   The idea being that if a user does a 
`SELECT ... LIMIT 10` your reader should stop reading as soon as that limit is 
reached.  Which means that the `next` function should return `false` when the 
limit has been reached.   The good news is that your `writer` object actually 
has a method called `limitReached(<maxRecords>)` which will track this for you.
   
   Another thing you might consider doing which would clean up the code a bit 
would be to make the `parseLine` method return `true` if there is more data to 
read, `false` if not.  Also move the writer start and end to that method, then 
you could have a `next` method that looks like this:
   
   ```java
   @Override
     public boolean next() {
       recordCount = 0;
       while (!writer.isFull()) {
         if (!parseLine(writer)) {
           return false;
         }
       }
       return true;
     }
   ```   
   

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.types.TypeProtos;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)

Review comment:
       For all serializable classes in Drill, I would recommend overriding 
`equals`, `hashcode`, and `toString()` methods. 

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to read input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+    return (line != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();
+      loader.close();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to close input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+  }
+
+  private TupleMetadata buildSchema(){
+    SchemaBuilder builder = new SchemaBuilder();
+
+    for (FixedwidthFieldConfig field : config.getFields()){
+      builder.addNullable(field.getFieldName(),field.getDataType());
+    }
+
+      return builder.buildSchema();
+  }
+
+  private void parseLine(String line, RowSetLoader writer) {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      value = line.substring(field.getStartIndex() - 1, field.getStartIndex() 
+ field.getFieldWidth() - 1);
+
+      dataType = field.getDataType();
+      dateTimeFormat = field.getDateTimeFormat();
+      DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(dateTimeFormat, Locale.ENGLISH);
+
+      switch (dataType) {
+        case INT:
+          writer.scalar(i).setInt(Integer.parseInt(value));
+          break;
+        case VARCHAR:
+          writer.scalar(i).setString(value);
+          break;
+        case DATE:
+          LocalDate date = LocalDate.parse(value, formatter);
+          writer.scalar(i).setDate(date);
+          break;
+        case TIME:
+          LocalTime time = LocalTime.parse(value, formatter);
+          writer.scalar(i).setTime(time);
+          break;
+        case TIMESTAMP:
+          LocalDateTime ldt = LocalDateTime.parse(value,formatter);
+          ZoneId z = ZoneId.of( "America/Toronto" );
+          ZonedDateTime zdt = ldt.atZone( z );
+          Instant timeStamp = zdt.toInstant();
+          writer.scalar(i).setTimestamp(timeStamp);
+          break;
+        default:
+          throw new RuntimeException("Unknown data type specified in fixed 
width. Found data type " + dataType);

Review comment:
       Please convert this to a `UserException`. 

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {

Review comment:
       I'd suggest tightening this a bit.  Instead of capturing a generic 
`Exception`, perhaps use specific exceptions with different error messages.  
Alternatively just put the try/catch around the line(s) in which data is being 
read.  

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
+
+public class FixedwidthFormatPlugin extends 
EasyFormatPlugin<FixedwidthFormatConfig> {
+
+  protected static final String DEFAULT_NAME = "fixedwidth";
+
+  private static class FixedwidthReaderFactory extends FileReaderFactory {
+
+    private final FixedwidthFormatConfig config;
+    private final int maxRecords;
+
+    public FixedwidthReaderFactory(FixedwidthFormatConfig config, int 
maxRecords) {
+      this.config = config;
+      this.maxRecords = maxRecords;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new FixedwidthBatchReader(config, maxRecords);
+    }
+  }
+
+  public FixedwidthFormatPlugin(String name,
+                                DrillbitContext context,
+                                Configuration fsConf,
+                                StoragePluginConfig storageConfig,
+                                FixedwidthFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
+  } //final?
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
FixedwidthFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+            .readable(true)
+            .writable(false)
+            .blockSplittable(false)

Review comment:
       I think this actually might be blocksplittable.  

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFieldConfig.java
##########
@@ -0,0 +1,71 @@
+package org.apache.drill.exec.store.fixedwidth;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.types.TypeProtos;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("fixedwidthReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class FixedwidthFieldConfig {
+
+  private final TypeProtos.MinorType dataType;
+  private final String fieldName;
+  private final String dateTimeFormat;
+  private final int startIndex;
+  private final int fieldWidth;
+
+  public FixedwidthFieldConfig(@JsonProperty("dataType") TypeProtos.MinorType 
dataType,
+                               @JsonProperty("fieldName") String fieldName,
+                               @JsonProperty("dateTimeFormat") String 
dateTimeFormat,
+                               @JsonProperty("startIndex") int startIndex,
+                               @JsonProperty("fieldWidth") int fieldWidth) {
+    this.dataType = dataType;
+    this.fieldName = fieldName;
+    this.dateTimeFormat = dateTimeFormat;
+    this.startIndex = startIndex;
+    this.fieldWidth = fieldWidth;
+  }
+
+  public TypeProtos.MinorType getDataType(){
+    return dataType;
+  }
+
+//  public void setDataType(TypeProtos.MinorType dataType){

Review comment:
       Please remove struts here and elsewhere.

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+

Review comment:
       Nit:  Please remove extra whitespace, here and elsewhere.

##########
File path: 
contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements 
ManagedReader<FileSchemaNegotiator>{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FixedwidthBatchReader.class);
+
+  private FileSplit split;
+
+  private final int maxRecords;
+
+  private final FixedwidthFormatConfig config;
+
+  private CustomErrorContext errorContext;
+
+  private InputStream fsStream;
+
+  private ResultSetLoader loader;
+
+  private BufferedReader reader;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(),true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to open input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+
+    reader = new BufferedReader(new InputStreamReader(fsStream, 
Charsets.UTF_8));
+
+    return true;
+
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into 
Drill rows
+
+    String line;
+
+    try {
+      line = reader.readLine();
+      RowSetLoader writer = loader.writer();
+
+      while (!writer.isFull() && line != null) {
+
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+
+        line = reader.readLine();
+      }
+    } catch (Exception e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed to read input file: {}", 
split.getPath().toString())
+              .addContext(errorContext)
+              .addContext(e.getMessage())
+              .build(logger);
+    }
+    return (line != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      fsStream.close();

Review comment:
       Consider using `Autoclosables` here.  That will do all the error 
handling for you. 
   
   ```java
    @Override
     public void close() {
       if (fsStream != null) {
         AutoCloseables.closeSilently(fsStream);
         fsStream = null;
       }
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Fixed Width Format Plugin
> -------------------------
>
>                 Key: DRILL-7978
>                 URL: https://issues.apache.org/jira/browse/DRILL-7978
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components: Storage - Other
>            Reporter: Megan Foss
>            Priority: Major
>
> Developing format plugin to parse fixed width files.
> Fixed Width Text File Definition: 
> https://www.oracle.com/webfolder/technetwork/data-quality/edqhelp/Content/introduction/getting_started/configuring_fixed_width_text_file_formats.htm



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to