[ 
https://issues.apache.org/jira/browse/DRILL-2835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801647#comment-17801647
 ] 

ASF GitHub Bot commented on DRILL-2835:
---------------------------------------

cgivre commented on code in PR #2836:
URL: https://github.com/apache/drill/pull/2836#discussion_r1439055155


##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", 
dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to 
Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to 
the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no 
more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do 
not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were 
retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty 
data file.
+    // We DO know that this won't be called if there is no space in the batch 
for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to 
parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip 
this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw 
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation 
errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {

Review Comment:
   Same comment here.  Do we know what kind(s) of exceptions we may encounter 
here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {

Review Comment:
   Can we be more specific here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)

Review Comment:
   Please remove... 



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ * <br/>
+ * You construct this providing a DataProcessor obtained from the
+ * DaffodilDataProcessorFactory.
+ * The DataProcessor contains the compiled DFDL schema, ready to use, as
+ * well as whether validation while parsing has been requested.
+ * <br/>
+ * The DataProcessor object may be shared/reused by multiple threads each of 
which
+ * has its own copy of this class.
+ * This object is, however, stateful, and must not be shared by multiple 
threads.
+ * <br/>
+ * You must call setInputStream, and setInfosetOutputter before
+ * you call parse().
+ * The input stream and the InfosetOutputter objects are also private to one 
thread and are stateful
+ * and owned by this object.
+ * Once you have called setInputStream, you should view the input stream as 
the private property of
+ * this object.
+ * The parse() will invoke the InfosetOutputter's methods to deliver
+ * parsed data, and it may optionally create diagnostics (obtained via 
getDiagnostics)
+ * indicating which kind they are via the getIsProcessingError, 
getIsValidationError.
+ * <br/>
+ * Note that the InfosetOutputter may be called many times before a processing 
error is detected,
+ * as Daffodil delivers result data incrementally.
+ * <br/>
+ * Validation errors do not affect the InfosetOutputter output calls, but 
indicate that data was
+ * detected that is invalid.
+ * <br/>
+ * When parse() returns, the parse has ended and one can check for 
errors/diagnostics.
+ * One can call parse() again if there is still data to consume, which is 
checked via the
+ * isEOF() method.
+ * <br/>
+ * There are no guarantees about where the input stream is positioned between 
parse() calls.
+ * In particular, it may not be positioned at the start of the next message, 
as Daffodil may
+ * have pre-fetched additional bytes from the input stream which it found are 
not part of the
+ * current infoset, but the next one.
+ * The positioning of the input stream may in fact be somewhere in the middle 
of a byte,
+ * as Daffodil does not require messages to be of lengths that are in whole 
byte units.
+ * Hence, once you give the input stream to this object via setInputStream, 
that input stream is
+ * owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+  /**
+   * Constructs the parser using a DataProcessor obtained from
+   * a DaffodilDataProcessorFactory.
+   * @param dp
+   */
+  DaffodilMessageParser(DataProcessor dp) {
+    this.dp = dp;
+  }
+
+  /**
+   * Provide the input stream from which data is to be parsed.
+   * <br/>
+   * This input stream is then owned by this object and becomes part of its 
state.
+   * <br/>
+   * It is; however, the responsibility of the caller to close this
+   * input stream after the completion of all parse calls.
+   * In particular, if a parse error is considered fatal, then
+   * the caller should close the input stream.
+   * There are advanced error-recovery techniques that may attempt to find
+   * data that can be parsed later in the data stream.
+   * In those cases the input stream would not be closed after a processing 
error,
+   * but such usage is beyond the scope of this javadoc.
+   * @param inputStream
+   */
+  public void setInputStream(InputStream inputStream) {
+    dis = new InputSourceDataInputStream(inputStream);
+  }
+
+  /**
+   * Provides the InfosetOutputter which will be called to deliver
+   * the Infoset via calls to its methods.
+   * @param outputter
+   */
+  public void setInfosetOutputter(InfosetOutputter outputter) {
+    this.outputter = outputter;
+  }
+
+  /**
+   * Called to pull messages from the data stream.
+   * The message 'Infoset' is delivered by way of calls to the 
InfosetOutputter's methods.
+   * <br/>
+   * After calling this, one may call getIsProcessingError, 
getIsValiationError, isEOF, and
+   * getDiagnostics.
+   */
+  public void parse() {
+    if (dis == null)
+      throw new IllegalStateException("Input stream must be provided by 
setInputStream() call.");
+    if (outputter == null)
+      throw new IllegalStateException("InfosetOutputter must be provided by 
setInfosetOutputter() call.");
+
+    reset();
+    ParseResult res = dp.parse(dis, outputter);
+    isProcessingError = res.isProcessingError();
+    isValidationError = res.isValidationError();
+    diagnostics = res.getDiagnostics();
+  }
+
+  /**
+   * True if the input stream is known to contain no more data.
+   * If the input stream is a true stream, not a file, then temporary 
unavailability of data
+   * may cause this call to block until the stream is closed from the other 
end, or data becomes
+   * available.
+   * <br/>
+   * False if the input stream is at EOF, and no more data can be obtained.
+   * It is an error to call parse() after isEOF has returned true.
+   * @return
+   */
+  public boolean isEOF() {
+    return !dis.hasData();
+  }
+
+  /**
+   * True if the parse() call failed with a processing error.
+   * This indicates that the data was not well-formed and could not be
+   * parsed successfully.
+   * <br/>
+   * It is possible for isProcessingError and isValidationError to both be 
true.
+   * @return
+   */
+  public boolean isProcessingError() { return isProcessingError; }
+
+  /**
+   * True if a validation error occurred during parsing.
+   * Subsequently to a validation error occurring, parsing may succeed or fail.
+   * after the validation error was detected.
+   * @return
+   */
+  public boolean isValidationError() { return isValidationError; }
+
+  /**
+   * After a parse() call this returns null or a list of 1 or more diagnostics.
+   * <br/>
+   * If isProcessingError or isValidationError are true, then this will 
contain at least 1
+   * diagnostic.
+   * If both are true this will contain at least 2 diagnostics.
+   * @return
+   */
+  public List<Diagnostic> getDiagnostics() { return diagnostics;  }
+  public String getDiagnosticsAsString() {
+    String result = diagnostics.stream()
+        .map(Diagnostic::getMessage)
+        .collect(Collectors.joining("\n"));
+    return result;
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilMessageParser.class);

Review Comment:
   See above comment about Drill style for classes.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", 
dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to 
Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to 
the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no 
more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do 
not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were 
retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty 
data file.
+    // We DO know that this won't be called if there is no space in the batch 
for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to 
parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip 
this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw 
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation 
errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + 
e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();

Review Comment:
   Do we need this logic here?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import 
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilBatchReader.class);
+  private final DaffodilFormatConfig dafConfig;
+  private final RowSetLoader rowSetLoader;
+  private final CustomErrorContext errorContext;
+  private final DaffodilMessageParser dafParser;
+  private final InputStream dataInputStream;
+
+  static class DaffodilReaderConfig {
+    final DaffodilFormatPlugin plugin;
+    DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+      this.plugin = plugin;
+    }
+  }
+
+  public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan 
scan, FileSchemaNegotiator negotiator) {
+
+    errorContext = negotiator.parentErrorContext();
+    this.dafConfig = readerConfig.plugin.getConfig();
+
+    String schemaURIString = dafConfig.getSchemaURI(); // 
"schema/complexArray1.dfdl.xsd";
+    String rootName = dafConfig.getRootName();
+    String rootNamespace = dafConfig.getRootNamespace();
+    boolean validationMode = dafConfig.getValidationMode();
+
+    URI dfdlSchemaURI;
+    try {
+      dfdlSchemaURI = new URI(schemaURIString);
+    } catch (URISyntaxException e) {
+      throw UserException.validationError(e)
+          .build(logger);
+    }
+
+    FileDescrip file = negotiator.file();
+    DrillFileSystem fs = file.fileSystem();
+    URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+    DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+    DataProcessor dp;
+    try {
+      dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, 
rootNamespace);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to get Daffodil DFDL processor for: 
%s", fsSchemaURI))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // Create the corresponding Drill schema.
+    // Note: this could be a very large schema. Think of a large complex RDBMS 
schema,
+    // all of it, hundreds of tables, but all part of the same metadata tree.
+    TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+    // Inform Drill about the schema
+    negotiator.tableSchema(drillSchema, true);
+
+    //
+    // DATA TIME: Next we construct the runtime objects, and open files.
+    //
+    // We get the DaffodilMessageParser, which is a stateful driver for 
daffodil that
+    // actually does the parsing.
+    rowSetLoader = negotiator.build().writer();
+
+    // We construct the Daffodil InfosetOutputter which the daffodil parser 
uses to
+    // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+    DaffodilDrillInfosetOutputter outputter = new 
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+    // Now we can setup the dafParser with the outputter it will drive with
+    // the parser-produced infoset.
+    dafParser = new DaffodilMessageParser(dp); // needs further initialization 
after this.
+    dafParser.setInfosetOutputter(outputter);
+
+    Path dataPath = file.split().getPath();
+    // Lastly, we open the data stream
+    try {
+      dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+          .message(String.format("Failed to open input file: %s", 
dataPath.toString()))
+          .addContext(errorContext).addContext(e.getMessage()).build(logger);
+    }
+    // And lastly,... tell daffodil the input data stream.
+    dafParser.setInputStream(dataInputStream);
+  }
+
+
+  /**
+   * This is the core of actual processing - data movement from Daffodil to 
Drill.
+   * <p>
+   * If there is space in the batch, and there is data available to parse
+   * then this calls the daffodil parser, which parses data, delivering it to 
the rowWriter
+   * by way of the infoset outputter.
+   * <p>
+   * Repeats until the rowWriter is full (a batch is full), or there is no 
more data, or
+   * a parse error ends execution with a throw.
+   * <p>
+   * Validation errors and other warnings are not errors and are logged but do 
not cause
+   * parsing to fail/throw.
+   * @return true if there are rows retrieved, false if no rows were 
retrieved, which means
+   * no more will ever be retrieved (end of data).
+   * @throws RuntimeException on parse errors.
+   */
+  @Override
+  public boolean next() {
+    // Check assumed invariants
+    // We don't know if there is data or not. This could be called on an empty 
data file.
+    // We DO know that this won't be called if there is no space in the batch 
for even 1
+    // row.
+    if (dafParser.isEOF()) {
+      return false; // return without even checking for more rows or trying to 
parse.
+    }
+    while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip 
this loop.
+      // the predicate is always true once.
+      try {
+        dafParser.parse();
+        if (dafParser.isProcessingError()) {
+          assert(Objects.nonNull(dafParser.getDiagnostics()));
+          throw 
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+              .addContext(errorContext).build(logger);
+        }
+        if (dafParser.isValidationError()) {
+          logger.warn(dafParser.getDiagnosticsAsString());
+          // Note that even if daffodil is set to not validate, validation 
errors may still occur
+          // from DFDL's "recoverableError" assertions.
+        }
+      } catch (Exception e) {
+        throw UserException.dataReadError(e).message("Error parsing file: " + 
e.getMessage())
+            .addContext(errorContext).build(logger);
+      }
+      rowSetLoader.save();
+    }
+    int nRows = rowSetLoader.rowCount();
+    assert nRows > 0; // This cannot be zero. If the parse failed we will have 
already thrown out of here.
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(dataInputStream);

Review Comment:
   Do we need to close the Daffodil parser, or is it ok to leave that?



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());

Review Comment:
   For these methods here, I seem to recall that Drill has dedicated set 
methods for each data type.  Is there a reason why you chose the `setObject()`? 
 I suspect that if there are methods for each data type, we should probably use 
them since setObject probably does a type check anyway.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;

Review Comment:
   Nit:  Please use formatting consistent with Drill's style for this class.  
Usually we put the logger at the beginning, then constructors. etc.. 



##########
contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+  String schemaURIRoot = 
"file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+  @BeforeClass
+  public static void setup() throws Exception {
+    // boilerplate call to start test rig
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    DaffodilFormatConfig formatConfig =
+        new DaffodilFormatConfig(null,
+            "",
+            "",
+            "",
+            false);
+
+    cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+    // Needed to test against compressed files.
+    // Copies data from src/test/resources to the dfs root.
+    dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+  }
+
+  private String selectRow(String schema, String file) {
+    return "SELECT * FROM table(dfs.`data/" + file + "` " +
+        " (type => 'daffodil'," +
+        " validationMode => 'true', " +
+        " schemaURI => '" + schemaURIRoot + "schema/" + schema + ".dfdl.xsd'," 
+
+        " rootName => 'row'," +
+        " rootNamespace => null " +
+        "))";
+  }
+
+  /**
+   * This unit test tests a simple data file
+   *
+   * @throws Exception Throw exception if anything goes wrong
+   */
+  @Test
+  public void testSimpleQuery1() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(1, results.rowCount());
+
+    // create the expected metadata and data for this test
+    // metadata first
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("col", MinorType.INT)
+        .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(0x00000101) // aka 257
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSimpleQuery2() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("simple","data06Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(6, results.rowCount());
+
+    // create the expected metadata and data for this test
+    // metadata first
+    TupleMetadata expectedSchema = new SchemaBuilder()
+            .add("col", MinorType.INT)
+            .buildSchema();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+            .addRow(0x00000101)
+            .addRow(0x00000102)
+            .addRow(0x00000103)
+            .addRow(0x00000104)
+            .addRow(0x00000105)
+            .addRow(0x00000106)
+            .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testComplexQuery1() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();
+    assertEquals(1, results.rowCount());
+
+    RowSetReader rdr = results.reader();
+    rdr.next();
+    String col = rdr.getAsString();
+    assertEquals("{257, 258}", col);
+    assertFalse(rdr.next());
+    results.clear();
+  }
+
+  @Test
+  public void testComplexQuery2() throws Exception {
+
+    QueryBuilder qb = client.queryBuilder();
+    QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat"));
+    RowSet results = query.rowSet();
+    results.print();

Review Comment:
   Please remove output from unit tests when they are ready.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+//        .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+//        .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+//        .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+//        .put("INTEGER", TypeProtos.MinorType.BIGINT)
+//        .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+    case BIT: {
+      cw.setObject(ise.getBoolean());
+      break;
+    }
+//        .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+//        .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires 
conversion
+//        .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires 
conversion (maybe)
+    case FLOAT8: {
+      cw.setObject(ise.getDouble());
+      break;
+    }
+    case FLOAT4: {
+      cw.setObject(ise.getFloat());
+      break;
+    }
+    case VARBINARY: {
+      cw.setObject(ise.getHexBinary());
+      break;
+    }
+    case VARCHAR: {
+      //
+      // FIXME: VARCHAR is defined in drill as utf8 string.
+      // Is Drill expecting something other than a Java string in this 
setObject call?
+      // Should we be mapping Daffodil strings to Drill VAR16CHAR type?

Review Comment:
   I believe the answer to this question is yes.  We should map Daffodil 
strings to Drill `VARCHAR` types.  I wouldn't use the `VAR16CHAR` however.



##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+    extends InfosetOutputter {
+
+  private boolean isOriginalRoot() {
+    boolean result = currentTupleWriter() == rowSetWriter;
+    if (result)
+      assert(tupleWriterStack.size() == 1);
+    return result;
+  }
+
+  /**
+   * True if the next startComplex call will be for the
+   * DFDL infoset root element whose children are the columns of
+   * the row set.
+   */
+  private boolean isRootElement = true;
+
+  /**
+   * Stack that is used only if we have sub-structures that are not
+   * simple-type fields of the row.
+   */
+  private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+  private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+  private TupleWriter currentTupleWriter() {
+    return tupleWriterStack.peek();
+  }
+
+  private ArrayWriter currentArrayWriter() {
+    return arrayWriterStack.peek();
+  }
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+  private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+  private RowSetLoader rowSetWriter;
+
+  public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+    this.rowSetWriter = writer;
+    this.tupleWriterStack.push(writer);
+  }
+
+  @Override
+  public void reset() {
+    tupleWriterStack.clear();
+    tupleWriterStack.push(rowSetWriter);
+    arrayWriterStack.clear();
+    this.isRootElement = true;
+    checkCleanState();
+  }
+
+  private void checkCleanState() {
+    assert(isOriginalRoot());
+    assert(arrayWriterStack.isEmpty());
+    assert(isRootElement);
+  }
+
+  @Override
+  public void startDocument() {
+    checkCleanState();
+  }
+
+  @Override
+  public void endDocument() {
+    checkCleanState();
+  }
+
+  private String colName(ElementMetadata md) {
+    return DrillDaffodilSchemaVisitor.makeColumnName(md);
+  }
+
+  @Override
+  public void startSimple(InfosetSimpleElement ise) {
+    assert (!isRootElement);
+    ElementMetadata md = ise.metadata();
+    String colName = colName(md);
+    ColumnWriter cw;
+    if (md.isArray()) {
+      // A simple type array
+      assert(!arrayWriterStack.isEmpty());
+      cw = currentArrayWriter().scalar();
+    } else {
+      // A simple element within a map
+      // Note the map itself might be an array
+      // but we don't care about that here.
+      cw = currentTupleWriter().column(colName);
+    }
+    ColumnMetadata cm = cw.schema();
+    assert(cm.isScalar());
+    if (md.isNillable() && ise.isNilled()) {
+      assert cm.isNullable();
+      cw.setNull();
+    } else {
+      convertDaffodilValueToDrillValue(ise, cm, cw);
+    }
+  }
+
+  @Override
+  public void endSimple(InfosetSimpleElement diSimple) {
+    assert (!isRootElement);
+    // do nothing
+  }
+
+  @Override
+  public void startComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    String colName = colName(ce.metadata());
+    if (isRootElement) {
+      assert(isOriginalRoot());
+      // This complex element's corresponds to the root element of the
+      // DFDL schema. We don't treat this as a column of the row set.
+      // Rather, it's children are the columns of the row set.
+      //
+      // If we do nothing at all here, then we'll start getting
+      // even calls for the children.
+      isRootElement = false;
+      return;
+    }
+    if (md.isArray()) {
+      assert(!arrayWriterStack.isEmpty());
+      // FIXME: is this the way to add a complex array child item (i.e., each 
array item is a map)
+      tupleWriterStack.push(currentArrayWriter().tuple());
+    } else {
+      tupleWriterStack.push(currentTupleWriter().tuple(colName));
+    }
+  }
+
+  @Override
+  public void endComplex(InfosetComplexElement ce) {
+    ComplexElementMetadata md = ce.metadata();
+    if (isOriginalRoot()) {
+      isRootElement = true;
+      // do nothing else. The row gets closed-out in the 
DaffodilBatchReader.next() method.
+    } else {
+      // it's a map.
+      // We seem to not need to do anything to end the map. No action taken 
here works.
+      if (md.isArray()) {
+        assert (!arrayWriterStack.isEmpty());
+        currentArrayWriter().save(); // required for map array entries.
+      }
+      tupleWriterStack.pop();
+    }
+  }
+
+  @Override
+  public void startArray(InfosetArray diArray) {
+    ElementMetadata md = diArray.metadata();
+    assert (md.isArray());
+    // DFDL has no notion of an array directly within another array. A named 
field (map) is necessary
+    // before you can have another array.
+    assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a 
map, or the top level row.
+    String colName = colName(md);
+    TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+    ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+    arrayWriterStack.push(aw);
+  }
+
+  @Override
+  public void endArray(InfosetArray ia) {
+    ElementMetadata md = ia.metadata();
+    assert (md.isArray());
+    assert (!arrayWriterStack.empty());
+    // FIXME: How do we end/close-out an array?
+    // note that each array instance, when the instance is a map, must have
+    // save called after it is written to the array but that happens
+    // in endComplex events since it must be called not once per array, but
+    // once per array item.
+    arrayWriterStack.pop();
+  }
+
+  private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise, 
ColumnMetadata cm, ColumnWriter cw) {
+    PrimitiveType dafType = ise.metadata().primitiveType();
+    TypeProtos.MinorType drillType = 
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+    assert(drillType == cm.type());
+    switch (drillType) {
+    case INT: {
+      //
+      // FIXME: Javadoc for setObject says "primarily for testing"
+      // So how are we supposed to assign the column value then?
+      // Is there a way to get from a ColumnWriter to a typed scalar writer 
(downcast perhaps?)
+      cw.setObject(ise.getInt());
+      break;
+    }
+    case BIGINT: {
+      cw.setObject(ise.getLong());
+      break;
+    }
+    case SMALLINT: {
+      cw.setObject(ise.getShort());
+      break;
+    }
+    case TINYINT: {
+      cw.setObject(ise.getByte());
+      break;
+    }
+//        .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+//        .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+//        .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+//        .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+//        .put("INTEGER", TypeProtos.MinorType.BIGINT)
+//        .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+    case BIT: {
+      cw.setObject(ise.getBoolean());
+      break;
+    }
+//        .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+//        .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires 
conversion
+//        .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires 
conversion (maybe)
+    case FLOAT8: {
+      cw.setObject(ise.getDouble());
+      break;
+    }
+    case FLOAT4: {
+      cw.setObject(ise.getFloat());
+      break;
+    }
+    case VARBINARY: {
+      cw.setObject(ise.getHexBinary());
+      break;
+    }
+    case VARCHAR: {
+      //
+      // FIXME: VARCHAR is defined in drill as utf8 string.
+      // Is Drill expecting something other than a Java string in this 
setObject call?
+      // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
+      //
+      String s = ise.getString();
+      cw.setObject(s);
+      break;
+    }
+//        .put("TIME", TypeProtos.MinorType.TIME) // requires conversion

Review Comment:
   Does Daffodil support date, time, timestamp and interval types?  If so we 
should include those as well. 





> IndexOutOfBoundsException in partition sender when doing streaming aggregate 
> with LIMIT 
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-2835
>                 URL: https://issues.apache.org/jira/browse/DRILL-2835
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - RPC
>    Affects Versions: 0.8.0
>            Reporter: Aman Sinha
>            Assignee: Venki Korukanti
>            Priority: Major
>             Fix For: 0.9.0
>
>         Attachments: DRILL-2835-1.patch, DRILL-2835-2.patch
>
>
> Following CTAS run on a TPC-DS 100GB scale factor on a 10-node cluster: 
> {code}
> alter session set `planner.enable_hashagg` = false;
> alter session set `planner.enable_multiphase_agg` = true;
> create table dfs.tmp.stream9 as 
> select cr_call_center_sk , cr_catalog_page_sk ,  cr_item_sk , cr_reason_sk , 
> cr_refunded_addr_sk , count(*) from catalog_returns_dri100 
>  group by cr_call_center_sk , cr_catalog_page_sk ,  cr_item_sk , cr_reason_sk 
> , cr_refunded_addr_sk
>  limit 100
> ;
> {code}
> {code}
> Caused by: java.lang.IndexOutOfBoundsException: index: 1023, length: 1 
> (expected: range(0, 0))
>         at io.netty.buffer.DrillBuf.checkIndexD(DrillBuf.java:200) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at io.netty.buffer.DrillBuf.chk(DrillBuf.java:222) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at io.netty.buffer.DrillBuf.setByte(DrillBuf.java:621) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
>         at 
> org.apache.drill.exec.vector.UInt1Vector$Mutator.set(UInt1Vector.java:342) 
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.vector.NullableBigIntVector$Mutator.set(NullableBigIntVector.java:372)
>  ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.vector.NullableBigIntVector.copyFrom(NullableBigIntVector.java:284)
>  ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.doEval(PartitionerTemplate.java:370)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.copy(PartitionerTemplate.java:249)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4.doCopy(PartitionerTemplate.java:208)
>  ~[na:na]
>         at 
> org.apache.drill.exec.test.generated.PartitionerGen4.partitionBatch(PartitionerTemplate.java:176)
>  ~[na:na]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to