[
https://issues.apache.org/jira/browse/DRILL-2835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801647#comment-17801647
]
ASF GitHub Bot commented on DRILL-2835:
---------------------------------------
cgivre commented on code in PR #2836:
URL: https://github.com/apache/drill/pull/2836#discussion_r1439055155
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan
scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); //
"schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName,
rootNamespace);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for:
%s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS
schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for
daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser
uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can setup the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization
after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s",
dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to
Drill.
+ * <p>
+ * If there is space in the batch, and there is data available to parse
+ * then this calls the daffodil parser, which parses data, delivering it to
the rowWriter
+ * by way of the infoset outputter.
+ * <p>
+ * Repeats until the rowWriter is full (a batch is full), or there is no
more data, or
+ * a parse error ends execution with a throw.
+ * <p>
+ * Validation errors and other warnings are not errors and are logged but do
not cause
+ * parsing to fail/throw.
+ * @return true if there are rows retrieved, false if no rows were
retrieved, which means
+ * no more will ever be retrieved (end of data).
+ * @throws RuntimeException on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty
data file.
+ // We DO know that this won't be called if there is no space in the batch
for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to
parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip
this loop.
+ // the predicate is always true once.
+ try {
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ assert(Objects.nonNull(dafParser.getDiagnostics()));
+ throw
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation
errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ } catch (Exception e) {
Review Comment:
Same comment here. Do we know what kind(s) of exceptions we may encounter
here?
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan
scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); //
"schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName,
rootNamespace);
+ } catch (Exception e) {
Review Comment:
Can we be more specific here?
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result)
+ assert(tupleWriterStack.size() == 1);
+ return result;
+ }
+
+ /**
+ * True if the next startComplex call will be for the
+ * DFDL infoset root element whose children are the columns of
+ * the row set.
+ */
+ private boolean isRootElement = true;
+
+ /**
+ * Stack that is used only if we have sub-structures that are not
+ * simple-type fields of the row.
+ */
+ private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+ private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ private RowSetLoader rowSetWriter;
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert(isOriginalRoot());
+ assert(arrayWriterStack.isEmpty());
+ assert(isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ColumnWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert(!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array
+ // but we don't care about that here.
+ cw = currentTupleWriter().column(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert(cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert(isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // even calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert(!arrayWriterStack.isEmpty());
+ // FIXME: is this the way to add a complex array child item (i.e., each
array item is a map)
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the
DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken
here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named
field (map) is necessary
+ // before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a
map, or the top level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise,
ColumnMetadata cm, ColumnWriter cw) {
+ PrimitiveType dafType = ise.metadata().primitiveType();
+ TypeProtos.MinorType drillType =
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert(drillType == cm.type());
+ switch (drillType) {
+ case INT: {
+ //
+ // FIXME: Javadoc for setObject says "primarily for testing"
+ // So how are we supposed to assign the column value then?
+ // Is there a way to get from a ColumnWriter to a typed scalar writer
(downcast perhaps?)
+ cw.setObject(ise.getInt());
+ break;
+ }
+ case BIGINT: {
+ cw.setObject(ise.getLong());
+ break;
+ }
+ case SMALLINT: {
+ cw.setObject(ise.getShort());
+ break;
+ }
+ case TINYINT: {
+ cw.setObject(ise.getByte());
+ break;
+ }
+// .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
Review Comment:
Please remove...
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilMessageParser.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.daffodil.japi.Diagnostic;
+import org.apache.daffodil.japi.ParseResult;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.japi.io.InputSourceDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DFDL Daffodil Streaming message parser
+ * <br/>
+ * You construct this providing a DataProcessor obtained from the
+ * DaffodilDataProcessorFactory.
+ * The DataProcessor contains the compiled DFDL schema, ready to use, as
+ * well as whether validation while parsing has been requested.
+ * <br/>
+ * The DataProcessor object may be shared/reused by multiple threads each of
which
+ * has its own copy of this class.
+ * This object is, however, stateful, and must not be shared by multiple
threads.
+ * <br/>
+ * You must call setInputStream, and setInfosetOutputter before
+ * you call parse().
+ * The input stream and the InfosetOutputter objects are also private to one
thread and are stateful
+ * and owned by this object.
+ * Once you have called setInputStream, you should view the input stream as
the private property of
+ * this object.
+ * The parse() will invoke the InfosetOutputter's methods to deliver
+ * parsed data, and it may optionally create diagnostics (obtained via
getDiagnostics)
+ * indicating which kind they are via the getIsProcessingError,
getIsValidationError.
+ * <br/>
+ * Note that the InfosetOutputter may be called many times before a processing
error is detected,
+ * as Daffodil delivers result data incrementally.
+ * <br/>
+ * Validation errors do not affect the InfosetOutputter output calls, but
indicate that data was
+ * detected that is invalid.
+ * <br/>
+ * When parse() returns, the parse has ended and one can check for
errors/diagnostics.
+ * One can call parse() again if there is still data to consume, which is
checked via the
+ * isEOF() method.
+ * <br/>
+ * There are no guarantees about where the input stream is positioned between
parse() calls.
+ * In particular, it may not be positioned at the start of the next message,
as Daffodil may
+ * have pre-fetched additional bytes from the input stream which it found are
not part of the
+ * current infoset, but the next one.
+ * The positioning of the input stream may in fact be somewhere in the middle
of a byte,
+ * as Daffodil does not require messages to be of lengths that are in whole
byte units.
+ * Hence, once you give the input stream to this object via setInputStream,
that input stream is
+ * owned privately by this class for ever after.
+ */
+public class DaffodilMessageParser {
+
+ /**
+ * Constructs the parser using a DataProcessor obtained from
+ * a DaffodilDataProcessorFactory.
+ * @param dp
+ */
+ DaffodilMessageParser(DataProcessor dp) {
+ this.dp = dp;
+ }
+
+ /**
+ * Provide the input stream from which data is to be parsed.
+ * <br/>
+ * This input stream is then owned by this object and becomes part of its
state.
+ * <br/>
+ * It is; however, the responsibility of the caller to close this
+ * input stream after the completion of all parse calls.
+ * In particular, if a parse error is considered fatal, then
+ * the caller should close the input stream.
+ * There are advanced error-recovery techniques that may attempt to find
+ * data that can be parsed later in the data stream.
+ * In those cases the input stream would not be closed after a processing
error,
+ * but such usage is beyond the scope of this javadoc.
+ * @param inputStream
+ */
+ public void setInputStream(InputStream inputStream) {
+ dis = new InputSourceDataInputStream(inputStream);
+ }
+
+ /**
+ * Provides the InfosetOutputter which will be called to deliver
+ * the Infoset via calls to its methods.
+ * @param outputter
+ */
+ public void setInfosetOutputter(InfosetOutputter outputter) {
+ this.outputter = outputter;
+ }
+
+ /**
+ * Called to pull messages from the data stream.
+ * The message 'Infoset' is delivered by way of calls to the
InfosetOutputter's methods.
+ * <br/>
+ * After calling this, one may call getIsProcessingError,
getIsValiationError, isEOF, and
+ * getDiagnostics.
+ */
+ public void parse() {
+ if (dis == null)
+ throw new IllegalStateException("Input stream must be provided by
setInputStream() call.");
+ if (outputter == null)
+ throw new IllegalStateException("InfosetOutputter must be provided by
setInfosetOutputter() call.");
+
+ reset();
+ ParseResult res = dp.parse(dis, outputter);
+ isProcessingError = res.isProcessingError();
+ isValidationError = res.isValidationError();
+ diagnostics = res.getDiagnostics();
+ }
+
+ /**
+ * True if the input stream is known to contain no more data.
+ * If the input stream is a true stream, not a file, then temporary
unavailability of data
+ * may cause this call to block until the stream is closed from the other
end, or data becomes
+ * available.
+ * <br/>
+ * False if the input stream is at EOF, and no more data can be obtained.
+ * It is an error to call parse() after isEOF has returned true.
+ * @return
+ */
+ public boolean isEOF() {
+ return !dis.hasData();
+ }
+
+ /**
+ * True if the parse() call failed with a processing error.
+ * This indicates that the data was not well-formed and could not be
+ * parsed successfully.
+ * <br/>
+ * It is possible for isProcessingError and isValidationError to both be
true.
+ * @return
+ */
+ public boolean isProcessingError() { return isProcessingError; }
+
+ /**
+ * True if a validation error occurred during parsing.
+ * Subsequently to a validation error occurring, parsing may succeed or fail.
+ * after the validation error was detected.
+ * @return
+ */
+ public boolean isValidationError() { return isValidationError; }
+
+ /**
+ * After a parse() call this returns null or a list of 1 or more diagnostics.
+ * <br/>
+ * If isProcessingError or isValidationError are true, then this will
contain at least 1
+ * diagnostic.
+ * If both are true this will contain at least 2 diagnostics.
+ * @return
+ */
+ public List<Diagnostic> getDiagnostics() { return diagnostics; }
+ public String getDiagnosticsAsString() {
+ String result = diagnostics.stream()
+ .map(Diagnostic::getMessage)
+ .collect(Collectors.joining("\n"));
+ return result;
+ }
+
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilMessageParser.class);
Review Comment:
See above comment about Drill style for classes.
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan
scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); //
"schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName,
rootNamespace);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for:
%s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS
schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for
daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser
uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can setup the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization
after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s",
dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to
Drill.
+ * <p>
+ * If there is space in the batch, and there is data available to parse
+ * then this calls the daffodil parser, which parses data, delivering it to
the rowWriter
+ * by way of the infoset outputter.
+ * <p>
+ * Repeats until the rowWriter is full (a batch is full), or there is no
more data, or
+ * a parse error ends execution with a throw.
+ * <p>
+ * Validation errors and other warnings are not errors and are logged but do
not cause
+ * parsing to fail/throw.
+ * @return true if there are rows retrieved, false if no rows were
retrieved, which means
+ * no more will ever be retrieved (end of data).
+ * @throws RuntimeException on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty
data file.
+ // We DO know that this won't be called if there is no space in the batch
for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to
parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip
this loop.
+ // the predicate is always true once.
+ try {
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ assert(Objects.nonNull(dafParser.getDiagnostics()));
+ throw
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation
errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ } catch (Exception e) {
+ throw UserException.dataReadError(e).message("Error parsing file: " +
e.getMessage())
+ .addContext(errorContext).build(logger);
+ }
+ rowSetLoader.save();
+ }
+ int nRows = rowSetLoader.rowCount();
Review Comment:
Do we need this logic here?
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+
+import org.apache.daffodil.japi.DataProcessor;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import
org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
+
+
+public class DaffodilBatchReader implements ManagedReader {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilBatchReader.class);
+ private final DaffodilFormatConfig dafConfig;
+ private final RowSetLoader rowSetLoader;
+ private final CustomErrorContext errorContext;
+ private final DaffodilMessageParser dafParser;
+ private final InputStream dataInputStream;
+
+ static class DaffodilReaderConfig {
+ final DaffodilFormatPlugin plugin;
+ DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
+ this.plugin = plugin;
+ }
+ }
+
+ public DaffodilBatchReader (DaffodilReaderConfig readerConfig, EasySubScan
scan, FileSchemaNegotiator negotiator) {
+
+ errorContext = negotiator.parentErrorContext();
+ this.dafConfig = readerConfig.plugin.getConfig();
+
+ String schemaURIString = dafConfig.getSchemaURI(); //
"schema/complexArray1.dfdl.xsd";
+ String rootName = dafConfig.getRootName();
+ String rootNamespace = dafConfig.getRootNamespace();
+ boolean validationMode = dafConfig.getValidationMode();
+
+ URI dfdlSchemaURI;
+ try {
+ dfdlSchemaURI = new URI(schemaURIString);
+ } catch (URISyntaxException e) {
+ throw UserException.validationError(e)
+ .build(logger);
+ }
+
+ FileDescrip file = negotiator.file();
+ DrillFileSystem fs = file.fileSystem();
+ URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
+
+
+ DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
+ DataProcessor dp;
+ try {
+ dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName,
rootNamespace);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to get Daffodil DFDL processor for:
%s", fsSchemaURI))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // Create the corresponding Drill schema.
+ // Note: this could be a very large schema. Think of a large complex RDBMS
schema,
+ // all of it, hundreds of tables, but all part of the same metadata tree.
+ TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
+ // Inform Drill about the schema
+ negotiator.tableSchema(drillSchema, true);
+
+ //
+ // DATA TIME: Next we construct the runtime objects, and open files.
+ //
+ // We get the DaffodilMessageParser, which is a stateful driver for
daffodil that
+ // actually does the parsing.
+ rowSetLoader = negotiator.build().writer();
+
+ // We construct the Daffodil InfosetOutputter which the daffodil parser
uses to
+ // convert infoset event calls to fill in a Drill row via a rowSetLoader.
+ DaffodilDrillInfosetOutputter outputter = new
DaffodilDrillInfosetOutputter(rowSetLoader);
+
+ // Now we can setup the dafParser with the outputter it will drive with
+ // the parser-produced infoset.
+ dafParser = new DaffodilMessageParser(dp); // needs further initialization
after this.
+ dafParser.setInfosetOutputter(outputter);
+
+ Path dataPath = file.split().getPath();
+ // Lastly, we open the data stream
+ try {
+ dataInputStream = fs.openPossiblyCompressedStream(dataPath);
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s",
dataPath.toString()))
+ .addContext(errorContext).addContext(e.getMessage()).build(logger);
+ }
+ // And lastly,... tell daffodil the input data stream.
+ dafParser.setInputStream(dataInputStream);
+ }
+
+
+ /**
+ * This is the core of actual processing - data movement from Daffodil to
Drill.
+ * <p>
+ * If there is space in the batch, and there is data available to parse
+ * then this calls the daffodil parser, which parses data, delivering it to
the rowWriter
+ * by way of the infoset outputter.
+ * <p>
+ * Repeats until the rowWriter is full (a batch is full), or there is no
more data, or
+ * a parse error ends execution with a throw.
+ * <p>
+ * Validation errors and other warnings are not errors and are logged but do
not cause
+ * parsing to fail/throw.
+ * @return true if there are rows retrieved, false if no rows were
retrieved, which means
+ * no more will ever be retrieved (end of data).
+ * @throws RuntimeException on parse errors.
+ */
+ @Override
+ public boolean next() {
+ // Check assumed invariants
+ // We don't know if there is data or not. This could be called on an empty
data file.
+ // We DO know that this won't be called if there is no space in the batch
for even 1
+ // row.
+ if (dafParser.isEOF()) {
+ return false; // return without even checking for more rows or trying to
parse.
+ }
+ while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip
this loop.
+ // the predicate is always true once.
+ try {
+ dafParser.parse();
+ if (dafParser.isProcessingError()) {
+ assert(Objects.nonNull(dafParser.getDiagnostics()));
+ throw
UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
+ .addContext(errorContext).build(logger);
+ }
+ if (dafParser.isValidationError()) {
+ logger.warn(dafParser.getDiagnosticsAsString());
+ // Note that even if daffodil is set to not validate, validation
errors may still occur
+ // from DFDL's "recoverableError" assertions.
+ }
+ } catch (Exception e) {
+ throw UserException.dataReadError(e).message("Error parsing file: " +
e.getMessage())
+ .addContext(errorContext).build(logger);
+ }
+ rowSetLoader.save();
+ }
+ int nRows = rowSetLoader.rowCount();
+ assert nRows > 0; // This cannot be zero. If the parse failed we will have
already thrown out of here.
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(dataInputStream);
Review Comment:
Do we need to close the Daffodil parser, or is it ok to leave that?
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result)
+ assert(tupleWriterStack.size() == 1);
+ return result;
+ }
+
+ /**
+ * True if the next startComplex call will be for the
+ * DFDL infoset root element whose children are the columns of
+ * the row set.
+ */
+ private boolean isRootElement = true;
+
+ /**
+ * Stack that is used only if we have sub-structures that are not
+ * simple-type fields of the row.
+ */
+ private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+ private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ private RowSetLoader rowSetWriter;
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert(isOriginalRoot());
+ assert(arrayWriterStack.isEmpty());
+ assert(isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ColumnWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert(!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array
+ // but we don't care about that here.
+ cw = currentTupleWriter().column(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert(cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert(isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // even calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert(!arrayWriterStack.isEmpty());
+ // FIXME: is this the way to add a complex array child item (i.e., each
array item is a map)
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the
DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken
here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named
field (map) is necessary
+ // before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a
map, or the top level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise,
ColumnMetadata cm, ColumnWriter cw) {
+ PrimitiveType dafType = ise.metadata().primitiveType();
+ TypeProtos.MinorType drillType =
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert(drillType == cm.type());
+ switch (drillType) {
+ case INT: {
+ //
+ // FIXME: Javadoc for setObject says "primarily for testing"
+ // So how are we supposed to assign the column value then?
+ // Is there a way to get from a ColumnWriter to a typed scalar writer
(downcast perhaps?)
+ cw.setObject(ise.getInt());
Review Comment:
For these methods here, I seem to recall that Drill has dedicated set
methods for each data type. Is there a reason why you chose the `setObject()`?
I suspect that if there are methods for each data type, we should probably use
them since setObject probably does a type check anyway.
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
Review Comment:
Nit: Please use formatting consistent with Drill's style for this class.
Usually we put the logger at the beginning, then constructors. etc..
##########
contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.drill.categories.RowSetTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+@Category(RowSetTest.class)
+public class TestDaffodilReader extends ClusterTest {
+
+ String schemaURIRoot =
"file:///opt/drill/contrib/format-daffodil/src/test/resources/";
+ @BeforeClass
+ public static void setup() throws Exception {
+ // boilerplate call to start test rig
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ DaffodilFormatConfig formatConfig =
+ new DaffodilFormatConfig(null,
+ "",
+ "",
+ "",
+ false);
+
+ cluster.defineFormat("dfs", "daffodil", formatConfig);
+
+ // Needed to test against compressed files.
+ // Copies data from src/test/resources to the dfs root.
+ dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
+ }
+
+ private String selectRow(String schema, String file) {
+ return "SELECT * FROM table(dfs.`data/" + file + "` " +
+ " (type => 'daffodil'," +
+ " validationMode => 'true', " +
+ " schemaURI => '" + schemaURIRoot + "schema/" + schema + ".dfdl.xsd',"
+
+ " rootName => 'row'," +
+ " rootNamespace => null " +
+ "))";
+ }
+
+ /**
+ * This unit test tests a simple data file
+ *
+ * @throws Exception Throw exception if anything goes wrong
+ */
+ @Test
+ public void testSimpleQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple", "data01Int.dat.gz"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(0x00000101) // aka 257
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSimpleQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("simple","data06Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(6, results.rowCount());
+
+ // create the expected metadata and data for this test
+ // metadata first
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("col", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(0x00000101)
+ .addRow(0x00000102)
+ .addRow(0x00000103)
+ .addRow(0x00000104)
+ .addRow(0x00000105)
+ .addRow(0x00000106)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testComplexQuery1() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data02Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
+ assertEquals(1, results.rowCount());
+
+ RowSetReader rdr = results.reader();
+ rdr.next();
+ String col = rdr.getAsString();
+ assertEquals("{257, 258}", col);
+ assertFalse(rdr.next());
+ results.clear();
+ }
+
+ @Test
+ public void testComplexQuery2() throws Exception {
+
+ QueryBuilder qb = client.queryBuilder();
+ QueryBuilder query = qb.sql(selectRow("complex1", "data06Int.dat"));
+ RowSet results = query.rowSet();
+ results.print();
Review Comment:
Please remove output from unit tests when they are ready.
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result)
+ assert(tupleWriterStack.size() == 1);
+ return result;
+ }
+
+ /**
+ * True if the next startComplex call will be for the
+ * DFDL infoset root element whose children are the columns of
+ * the row set.
+ */
+ private boolean isRootElement = true;
+
+ /**
+ * Stack that is used only if we have sub-structures that are not
+ * simple-type fields of the row.
+ */
+ private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+ private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ private RowSetLoader rowSetWriter;
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert(isOriginalRoot());
+ assert(arrayWriterStack.isEmpty());
+ assert(isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ColumnWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert(!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array
+ // but we don't care about that here.
+ cw = currentTupleWriter().column(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert(cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert(isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // even calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert(!arrayWriterStack.isEmpty());
+ // FIXME: is this the way to add a complex array child item (i.e., each
array item is a map)
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the
DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken
here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named
field (map) is necessary
+ // before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a
map, or the top level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise,
ColumnMetadata cm, ColumnWriter cw) {
+ PrimitiveType dafType = ise.metadata().primitiveType();
+ TypeProtos.MinorType drillType =
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert(drillType == cm.type());
+ switch (drillType) {
+ case INT: {
+ //
+ // FIXME: Javadoc for setObject says "primarily for testing"
+ // So how are we supposed to assign the column value then?
+ // Is there a way to get from a ColumnWriter to a typed scalar writer
(downcast perhaps?)
+ cw.setObject(ise.getInt());
+ break;
+ }
+ case BIGINT: {
+ cw.setObject(ise.getLong());
+ break;
+ }
+ case SMALLINT: {
+ cw.setObject(ise.getShort());
+ break;
+ }
+ case TINYINT: {
+ cw.setObject(ise.getByte());
+ break;
+ }
+// .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+// .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+// .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+// .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+// .put("INTEGER", TypeProtos.MinorType.BIGINT)
+// .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+ case BIT: {
+ cw.setObject(ise.getBoolean());
+ break;
+ }
+// .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+// .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires
conversion
+// .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires
conversion (maybe)
+ case FLOAT8: {
+ cw.setObject(ise.getDouble());
+ break;
+ }
+ case FLOAT4: {
+ cw.setObject(ise.getFloat());
+ break;
+ }
+ case VARBINARY: {
+ cw.setObject(ise.getHexBinary());
+ break;
+ }
+ case VARCHAR: {
+ //
+ // FIXME: VARCHAR is defined in drill as utf8 string.
+ // Is Drill expecting something other than a Java string in this
setObject call?
+ // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
Review Comment:
I believe the answer to this question is yes. We should map Daffodil
strings to Drill `VARCHAR` types. I wouldn't use the `VAR16CHAR` however.
##########
contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilDrillInfosetOutputter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.daffodil;
+
+import org.apache.daffodil.runtime1.api.ComplexElementMetadata;
+import org.apache.daffodil.runtime1.api.ElementMetadata;
+import org.apache.daffodil.runtime1.api.InfosetArray;
+import org.apache.daffodil.runtime1.api.InfosetComplexElement;
+import org.apache.daffodil.japi.infoset.InfosetOutputter;
+import org.apache.daffodil.runtime1.api.InfosetSimpleElement;
+import org.apache.daffodil.runtime1.api.PrimitiveType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils;
+import org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaVisitor;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Stack;
+
+/**
+ * Adapts Daffodil parser infoset event calls to Drill writer calls
+ * to fill in Drill data rows.
+ */
+public class DaffodilDrillInfosetOutputter
+ extends InfosetOutputter {
+
+ private boolean isOriginalRoot() {
+ boolean result = currentTupleWriter() == rowSetWriter;
+ if (result)
+ assert(tupleWriterStack.size() == 1);
+ return result;
+ }
+
+ /**
+ * True if the next startComplex call will be for the
+ * DFDL infoset root element whose children are the columns of
+ * the row set.
+ */
+ private boolean isRootElement = true;
+
+ /**
+ * Stack that is used only if we have sub-structures that are not
+ * simple-type fields of the row.
+ */
+ private final Stack<TupleWriter> tupleWriterStack = new Stack<>();
+
+ private final Stack<ArrayWriter> arrayWriterStack = new Stack<>();
+
+ private TupleWriter currentTupleWriter() {
+ return tupleWriterStack.peek();
+ }
+
+ private ArrayWriter currentArrayWriter() {
+ return arrayWriterStack.peek();
+ }
+
+
+ private static final Logger logger =
LoggerFactory.getLogger(DaffodilDrillInfosetOutputter.class);
+
+ private DaffodilDrillInfosetOutputter() {} // no default constructor
+
+ private RowSetLoader rowSetWriter;
+
+ public DaffodilDrillInfosetOutputter(RowSetLoader writer) {
+ this.rowSetWriter = writer;
+ this.tupleWriterStack.push(writer);
+ }
+
+ @Override
+ public void reset() {
+ tupleWriterStack.clear();
+ tupleWriterStack.push(rowSetWriter);
+ arrayWriterStack.clear();
+ this.isRootElement = true;
+ checkCleanState();
+ }
+
+ private void checkCleanState() {
+ assert(isOriginalRoot());
+ assert(arrayWriterStack.isEmpty());
+ assert(isRootElement);
+ }
+
+ @Override
+ public void startDocument() {
+ checkCleanState();
+ }
+
+ @Override
+ public void endDocument() {
+ checkCleanState();
+ }
+
+ private String colName(ElementMetadata md) {
+ return DrillDaffodilSchemaVisitor.makeColumnName(md);
+ }
+
+ @Override
+ public void startSimple(InfosetSimpleElement ise) {
+ assert (!isRootElement);
+ ElementMetadata md = ise.metadata();
+ String colName = colName(md);
+ ColumnWriter cw;
+ if (md.isArray()) {
+ // A simple type array
+ assert(!arrayWriterStack.isEmpty());
+ cw = currentArrayWriter().scalar();
+ } else {
+ // A simple element within a map
+ // Note the map itself might be an array
+ // but we don't care about that here.
+ cw = currentTupleWriter().column(colName);
+ }
+ ColumnMetadata cm = cw.schema();
+ assert(cm.isScalar());
+ if (md.isNillable() && ise.isNilled()) {
+ assert cm.isNullable();
+ cw.setNull();
+ } else {
+ convertDaffodilValueToDrillValue(ise, cm, cw);
+ }
+ }
+
+ @Override
+ public void endSimple(InfosetSimpleElement diSimple) {
+ assert (!isRootElement);
+ // do nothing
+ }
+
+ @Override
+ public void startComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ String colName = colName(ce.metadata());
+ if (isRootElement) {
+ assert(isOriginalRoot());
+ // This complex element's corresponds to the root element of the
+ // DFDL schema. We don't treat this as a column of the row set.
+ // Rather, it's children are the columns of the row set.
+ //
+ // If we do nothing at all here, then we'll start getting
+ // even calls for the children.
+ isRootElement = false;
+ return;
+ }
+ if (md.isArray()) {
+ assert(!arrayWriterStack.isEmpty());
+ // FIXME: is this the way to add a complex array child item (i.e., each
array item is a map)
+ tupleWriterStack.push(currentArrayWriter().tuple());
+ } else {
+ tupleWriterStack.push(currentTupleWriter().tuple(colName));
+ }
+ }
+
+ @Override
+ public void endComplex(InfosetComplexElement ce) {
+ ComplexElementMetadata md = ce.metadata();
+ if (isOriginalRoot()) {
+ isRootElement = true;
+ // do nothing else. The row gets closed-out in the
DaffodilBatchReader.next() method.
+ } else {
+ // it's a map.
+ // We seem to not need to do anything to end the map. No action taken
here works.
+ if (md.isArray()) {
+ assert (!arrayWriterStack.isEmpty());
+ currentArrayWriter().save(); // required for map array entries.
+ }
+ tupleWriterStack.pop();
+ }
+ }
+
+ @Override
+ public void startArray(InfosetArray diArray) {
+ ElementMetadata md = diArray.metadata();
+ assert (md.isArray());
+ // DFDL has no notion of an array directly within another array. A named
field (map) is necessary
+ // before you can have another array.
+ assert (currentTupleWriter().type() == ObjectType.TUPLE); // parent is a
map, or the top level row.
+ String colName = colName(md);
+ TupleWriter enclosingParentTupleWriter = currentTupleWriter();
+ ArrayWriter aw = enclosingParentTupleWriter.array(colName);
+ arrayWriterStack.push(aw);
+ }
+
+ @Override
+ public void endArray(InfosetArray ia) {
+ ElementMetadata md = ia.metadata();
+ assert (md.isArray());
+ assert (!arrayWriterStack.empty());
+ // FIXME: How do we end/close-out an array?
+ // note that each array instance, when the instance is a map, must have
+ // save called after it is written to the array but that happens
+ // in endComplex events since it must be called not once per array, but
+ // once per array item.
+ arrayWriterStack.pop();
+ }
+
+ private void convertDaffodilValueToDrillValue(InfosetSimpleElement ise,
ColumnMetadata cm, ColumnWriter cw) {
+ PrimitiveType dafType = ise.metadata().primitiveType();
+ TypeProtos.MinorType drillType =
DrillDaffodilSchemaUtils.getDrillDataType(dafType);
+ assert(drillType == cm.type());
+ switch (drillType) {
+ case INT: {
+ //
+ // FIXME: Javadoc for setObject says "primarily for testing"
+ // So how are we supposed to assign the column value then?
+ // Is there a way to get from a ColumnWriter to a typed scalar writer
(downcast perhaps?)
+ cw.setObject(ise.getInt());
+ break;
+ }
+ case BIGINT: {
+ cw.setObject(ise.getLong());
+ break;
+ }
+ case SMALLINT: {
+ cw.setObject(ise.getShort());
+ break;
+ }
+ case TINYINT: {
+ cw.setObject(ise.getByte());
+ break;
+ }
+// .put("UNSIGNEDLONG", TypeProtos.MinorType.UINT8)
+// .put("UNSIGNEDINT", TypeProtos.MinorType.UINT4)
+// .put("UNSIGNEDSHORT", TypeProtos.MinorType.UINT2)
+// .put("UNSIGNEDBYTE", TypeProtos.MinorType.UINT1)
+// .put("INTEGER", TypeProtos.MinorType.BIGINT)
+// .put("NONNEGATIVEINTEGER", TypeProtos.MinorType.BIGINT)
+ case BIT: {
+ cw.setObject(ise.getBoolean());
+ break;
+ }
+// .put("DATE", TypeProtos.MinorType.DATE) // requires conversion
+// .put("DATETIME", TypeProtos.MinorType.TIMESTAMP) // requires
conversion
+// .put("DECIMAL", TypeProtos.MinorType.VARDECIMAL) // requires
conversion (maybe)
+ case FLOAT8: {
+ cw.setObject(ise.getDouble());
+ break;
+ }
+ case FLOAT4: {
+ cw.setObject(ise.getFloat());
+ break;
+ }
+ case VARBINARY: {
+ cw.setObject(ise.getHexBinary());
+ break;
+ }
+ case VARCHAR: {
+ //
+ // FIXME: VARCHAR is defined in drill as utf8 string.
+ // Is Drill expecting something other than a Java string in this
setObject call?
+ // Should we be mapping Daffodil strings to Drill VAR16CHAR type?
+ //
+ String s = ise.getString();
+ cw.setObject(s);
+ break;
+ }
+// .put("TIME", TypeProtos.MinorType.TIME) // requires conversion
Review Comment:
Does Daffodil support date, time, timestamp and interval types? If so we
should include those as well.
> IndexOutOfBoundsException in partition sender when doing streaming aggregate
> with LIMIT
> ----------------------------------------------------------------------------------------
>
> Key: DRILL-2835
> URL: https://issues.apache.org/jira/browse/DRILL-2835
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - RPC
> Affects Versions: 0.8.0
> Reporter: Aman Sinha
> Assignee: Venki Korukanti
> Priority: Major
> Fix For: 0.9.0
>
> Attachments: DRILL-2835-1.patch, DRILL-2835-2.patch
>
>
> Following CTAS run on a TPC-DS 100GB scale factor on a 10-node cluster:
> {code}
> alter session set `planner.enable_hashagg` = false;
> alter session set `planner.enable_multiphase_agg` = true;
> create table dfs.tmp.stream9 as
> select cr_call_center_sk , cr_catalog_page_sk , cr_item_sk , cr_reason_sk ,
> cr_refunded_addr_sk , count(*) from catalog_returns_dri100
> group by cr_call_center_sk , cr_catalog_page_sk , cr_item_sk , cr_reason_sk
> , cr_refunded_addr_sk
> limit 100
> ;
> {code}
> {code}
> Caused by: java.lang.IndexOutOfBoundsException: index: 1023, length: 1
> (expected: range(0, 0))
> at io.netty.buffer.DrillBuf.checkIndexD(DrillBuf.java:200)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
> at io.netty.buffer.DrillBuf.chk(DrillBuf.java:222)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
> at io.netty.buffer.DrillBuf.setByte(DrillBuf.java:621)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:4.0.24.Final]
> at
> org.apache.drill.exec.vector.UInt1Vector$Mutator.set(UInt1Vector.java:342)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
> at
> org.apache.drill.exec.vector.NullableBigIntVector$Mutator.set(NullableBigIntVector.java:372)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
> at
> org.apache.drill.exec.vector.NullableBigIntVector.copyFrom(NullableBigIntVector.java:284)
> ~[drill-java-exec-0.9.0-SNAPSHOT-rebuffed.jar:0.9.0-SNAPSHOT]
> at
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.doEval(PartitionerTemplate.java:370)
> ~[na:na]
> at
> org.apache.drill.exec.test.generated.PartitionerGen4$OutgoingRecordBatch.copy(PartitionerTemplate.java:249)
> ~[na:na]
> at
> org.apache.drill.exec.test.generated.PartitionerGen4.doCopy(PartitionerTemplate.java:208)
> ~[na:na]
> at
> org.apache.drill.exec.test.generated.PartitionerGen4.partitionBatch(PartitionerTemplate.java:176)
> ~[na:na]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)