sohami closed pull request #1518: DRILL-6824: Handle schema changes in 
MapRDBJsonRecordReader
URL: https://github.com/apache/drill/pull/1518
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 0be44e84b09..5b849ea0508 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -45,6 +45,7 @@
 import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.fs.Path;
+import org.ojai.Document;
 import org.ojai.DocumentReader;
 import org.ojai.DocumentStream;
 import org.ojai.FieldPath;
@@ -77,6 +78,7 @@
 
 public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private static final Logger logger = 
LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+  protected enum SchemaState {SCHEMA_UNKNOWN, SCHEMA_INIT, SCHEMA_CHANGE};
 
   protected static final FieldPath[] ID_ONLY_PROJECTION = { ID_FIELD };
 
@@ -94,16 +96,19 @@
   private OperatorContext operatorContext;
   protected VectorContainerWriter vectorWriter;
   private DBDocumentReaderBase reader;
+  Document document;
+  protected OutputMutator vectorWriterMutator;
 
   private DrillBuf buffer;
 
   private DocumentStream documentStream;
 
   private Iterator<DocumentReader> documentReaderIterators;
+  private Iterator<Document> documentIterator;
 
   private boolean includeId;
   private boolean idOnly;
-
+  private SchemaState schemaState;
   private boolean projectWholeDocument;
   private FieldProjector projector;
 
@@ -121,11 +126,16 @@
   protected OjaiValueWriter valueWriter;
   protected DocumentReaderVectorWriter documentWriter;
   protected int maxRecordsToRead = -1;
+  protected DBDocumentReaderBase lastDocumentReader;
+  protected Document lastDocument;
 
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, 
MapRDBFormatPlugin formatPlugin,
                                 List<SchemaPath> projectedColumns, 
FragmentContext context, int maxRecords) {
     this(subScanSpec, formatPlugin, projectedColumns, context);
     this.maxRecordsToRead = maxRecords;
+    this.lastDocumentReader = null;
+    this.lastDocument = null;
+    this.schemaState = SchemaState.SCHEMA_UNKNOWN;
   }
 
   protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, 
MapRDBFormatPlugin formatPlugin,
@@ -264,34 +274,40 @@ protected boolean getIgnoreSchemaChange() {
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
     this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
+    this.vectorWriterMutator = output;
     this.operatorContext = context;
 
     try {
       table.setOption(TableOption.EXCLUDEID, !includeId);
       documentStream = table.find(condition, scannedFields);
-      documentReaderIterators = documentStream.documentReaders().iterator();
-
-      if (allTextMode) {
-        valueWriter = new AllTextValueWriter(buffer);
-      } else if (readNumbersAsDouble) {
-        valueWriter = new NumbersAsDoubleValueWriter(buffer);
-      } else {
-        valueWriter = new OjaiValueWriter(buffer);
-      }
-
-      if (projectWholeDocument) {
-        documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, 
projector, includeId);
-      } else if (isSkipQuery()) {
-        documentWriter = new RowCountVectorWriter(valueWriter);
-      } else if (idOnly) {
-        documentWriter = new IdOnlyVectorWriter(valueWriter);
-      } else {
-        documentWriter = new FieldTransferVectorWriter(valueWriter);
-      }
+      documentIterator = documentStream.iterator();
+      setupWriter();
     } catch (DBException ex) {
       throw new ExecutionSetupException(ex);
     }
   }
+  /*
+   * Setup the valueWriter and documentWriters based on config options
+   */
+  private void setupWriter() {
+    if (allTextMode) {
+      valueWriter = new AllTextValueWriter(buffer);
+    } else if (readNumbersAsDouble) {
+      valueWriter = new NumbersAsDoubleValueWriter(buffer);
+    } else {
+      valueWriter = new OjaiValueWriter(buffer);
+    }
+
+    if (projectWholeDocument) {
+      documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, 
projector, includeId);
+    } else if (isSkipQuery()) {
+      documentWriter = new RowCountVectorWriter(valueWriter);
+    } else if (idOnly) {
+      documentWriter = new IdOnlyVectorWriter(valueWriter);
+    } else {
+      documentWriter = new FieldTransferVectorWriter(valueWriter);
+    }
+  }
 
   @Override
   public int next() {
@@ -303,33 +319,71 @@ public int next() {
 
     int recordCount = 0;
     reader = null;
+    document = null;
 
     int maxRecordsForThisBatch = this.maxRecordsToRead >= 0?
         Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, 
this.maxRecordsToRead) : BaseValueVector.INITIAL_VALUE_ALLOCATION;
 
+    try {
+      // If the last document caused a SchemaChange create a new output schema 
for this scan batch
+      if (schemaState == SchemaState.SCHEMA_CHANGE && !ignoreSchemaChange) {
+        // Clear the ScanBatch vector container writer/mutator in order to be 
able to generate the new schema
+        vectorWriterMutator.clear();
+        vectorWriter = new VectorContainerWriter(vectorWriterMutator, 
unionEnabled);
+        logger.debug("Encountered schema change earlier use new writer {}", 
vectorWriter.toString());
+        document = lastDocument;
+        setupWriter();
+        if (recordCount < maxRecordsForThisBatch) {
+          vectorWriter.setPosition(recordCount);
+          if (document != null) {
+            reader = (DBDocumentReaderBase) document.asReader();
+            documentWriter.writeDBDocument(vectorWriter, reader);
+            recordCount++;
+          }
+        }
+      }
+    } catch (SchemaChangeException e) {
+      String err_row = reader.getId().asJsonString();
+      if (ignoreSchemaChange) {
+        logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), 
err_row);
+        logger.debug("Stack trace:", e);
+      } else {
+          /* We should not encounter a SchemaChangeException here since this 
is the first document for this
+           * new schema. Something is very wrong - cannot handle any further!
+           */
+        throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", 
err_row);
+      }
+    }
+    schemaState = SchemaState.SCHEMA_INIT;
     while(recordCount < maxRecordsForThisBatch) {
       vectorWriter.setPosition(recordCount);
       try {
-        reader = nextDocumentReader();
-        if (reader == null) {
+        document = nextDocument();
+        if (document == null) {
           break; // no more documents for this reader
         } else {
-          documentWriter.writeDBDocument(vectorWriter, reader);
+          documentWriter.writeDBDocument(vectorWriter, (DBDocumentReaderBase) 
document.asReader());
         }
         recordCount++;
       } catch (UserException e) {
         throw UserException.unsupportedError(e)
             .addContext(String.format("Table: %s, document id: '%s'",
                 table.getPath(),
-                reader == null ? null : IdCodec.asString(reader.getId())))
+                    document.asReader() == null ? null :
+                        
IdCodec.asString(((DBDocumentReaderBase)document.asReader()).getId())))
             .build(logger);
       } catch (SchemaChangeException e) {
-        String err_row = reader.getId().asJsonString();
+        String err_row = 
((DBDocumentReaderBase)document.asReader()).getId().asJsonString();
         if (ignoreSchemaChange) {
           logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), 
err_row);
           logger.debug("Stack trace:", e);
         } else {
-          throw dataReadError(logger, e, "SchemaChangeException for row 
'%s'.", err_row);
+          /* Save the current document reader for next iteration. The 
recordCount is not updated so we
+           * would start from this reader on the next next() call
+           */
+          lastDocument = document;
+          schemaState = SchemaState.SCHEMA_CHANGE;
+          break;
         }
       }
     }
@@ -367,6 +421,27 @@ protected DBDocumentReaderBase nextDocumentReader() {
     }
   }
 
+  protected Document nextDocument() {
+    final OperatorStats operatorStats = operatorContext == null ? null : 
operatorContext.getStats();
+    try {
+      if (operatorStats != null) {
+        operatorStats.startWait();
+      }
+      try {
+        if (!documentIterator.hasNext()) {
+          return null;
+        } else {
+          return documentIterator.next();
+        }
+      } finally {
+        if (operatorStats != null) {
+          operatorStats.stopWait();
+        }
+      }
+    } catch (DBException e) {
+      throw dataReadError(logger, e);
+    }
+  }
   /*
    * Extracts contiguous named segments from the SchemaPath, starting from the
    * root segment and build the FieldPath from it for projection.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index b25b001bcbd..b78eaa1451d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -68,4 +68,9 @@
    * @return the CallBack object for this mutator
    */
   public CallBack getCallBack();
+
+  /**
+   * Clear this mutator i.e. reset it to pristine condition
+   */
+  public void clear();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a688f37a1fd..0aa8328dd5a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -491,9 +491,11 @@ public CallBack getCallBack() {
       return callBack;
     }
 
+    @Override
     public void clear() {
       regularFieldVectorMap.clear();
       implicitFieldVectorMap.clear();
+      container.clear();
       schemaChanged = false;
     }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index f29002e26f3..1bd90b37e7d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -364,6 +364,11 @@ public DrillBuf getManagedBuffer() {
    public CallBack getCallBack() {
      return null;
    }
+
+   @Override
+   public void clear() {
+     // Nothing to do!
+   }
  }
 
   private void validateFooters(final List<Footer> metadata) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to