DRILL-6323: Lateral Join - Add some debug logs for LateralJoinBatch

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b6f10e4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b6f10e4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b6f10e4

Branch: refs/heads/master
Commit: 4b6f10e401b3e17fe84e27fc740014d7e113bcd2
Parents: 5ed3196
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Tue Mar 13 16:41:03 2018 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:16:02 2018 -0700

----------------------------------------------------------------------
 .../physical/impl/join/LateralJoinBatch.java    | 49 ++++++++++++++------
 1 file changed, 36 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4b6f10e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index f01bf1c..70ac11b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -89,6 +89,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
+      logger.debug(String.format("Setting up new schema based on incoming 
batch. Old output schema: %s",
+        container.getSchema()));
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -122,14 +124,17 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     while (needLeftBatch) {
       leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : 
leftUpstream;
       final boolean emptyLeftBatch = left.getRecordCount() <=0;
+      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
 
       switch (leftUpstream) {
         case OK_NEW_SCHEMA:
           // This OK_NEW_SCHEMA is received post build schema phase and from 
left side
-          // If schema didn't actually changed then just handle it as OK 
outcome
+          // If schema didn't actually changed then just handle it as OK 
outcome. This is fine since it is not setting
+          // up any incoming vector references in setupNewSchema. While 
copying the records it always work on latest
+          // incoming vector.
           if (!isSchemaChanged(left.getSchema(), leftSchema)) {
-            logger.warn("New schema received from left side is same as 
previous known left schema. Ignoring this " +
-              "schema change");
+            logger.warn(String.format("New schema received from left side is 
same as previous known left schema. " +
+              "Ignoring this schema change. Old Left Schema: %s, New Left 
Schema: %s", leftSchema, left.getSchema()));
 
             // Current left batch is empty and schema didn't changed as well, 
so let's get next batch and loose
             // OK_NEW_SCHEMA outcome
@@ -139,7 +144,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
             } else {
               leftUpstream = OK;
             }
-          } else if (outputIndex > 0) {
+          } else if (outputIndex > 0) { // can only reach here from 
produceOutputBatch
             // This means there is already some records from previous join 
inside left batch
             // So we need to pass that downstream and then handle the 
OK_NEW_SCHEMA in subsequent next call
             processLeftBatchInFuture = true;
@@ -150,7 +155,6 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
           // batch downstream
           if (emptyLeftBatch) {
             if (handleSchemaChange()) {
-              //container.setRecordCount(0);
               leftJoinIndex = -1;
               return OK_NEW_SCHEMA;
             } else {
@@ -170,7 +174,6 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
           // don't call next on right batch
           if (emptyLeftBatch) {
             leftJoinIndex = -1;
-            //container.setRecordCount(0);
             return EMIT;
           } else {
             leftJoinIndex = 0;
@@ -180,7 +183,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
         case NONE:
         case STOP:
           // Not using =0 since if outgoing container is empty then no point 
returning anything
-          if (outputIndex > 0) {
+          if (outputIndex > 0) { // can only reach here from produceOutputBatch
             processLeftBatchInFuture = true;
           }
           return leftUpstream;
@@ -227,7 +230,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
             "Right side batch with OK_NEW_SCHEMA is not empty");
 
           if (!isSchemaChanged(right.getSchema(), rightSchema)) {
-            logger.warn("New schema received from right side is same as 
previous known right schema. Ignoring this " + "schema change");
+            logger.warn(String.format("New schema received from right side is 
same as previous known right schema. " +
+              "Ignoring this schema change. Old Right schema: %s, New Right 
Schema: %s",
+              rightSchema, right.getSchema()));
             continue;
           }
           if (handleSchemaChange()) {
@@ -278,10 +283,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // reset this state after calling processLeftBatch above.
     processLeftBatchInFuture = false;
 
-    // If the left batch doesn't have any record in the incoming batch or the 
state returned from
-    // left side is terminal state then just return the IterOutcome and don't 
call next() on
-    // right branch
-    if (left.getRecordCount() == 0 || isTerminalOutcome(childOutcome)) {
+    // If the left batch doesn't have any record in the incoming batch (with 
OK_NEW_SCHEMA/EMIT) or the state returned
+    // from left side is terminal state then just return the IterOutcome and 
don't call next() on right branch
+    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
       container.setRecordCount(0);
       return childOutcome;
     }
@@ -384,14 +388,18 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
       if (outputIndex < MAX_BATCH_SIZE) {
         // Check if left side still has records or not
         if (isLeftProcessed) {
-          // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return 
output to downstream layer
+          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then 
return output to downstream layer before
+          // getting next batch
           if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
             break;
           } else {
+            logger.debug("Output batch still has some space left, getting new 
batches from left and right");
             // Get both left batch and the right batch and make sure indexes 
are properly set
             leftUpstream = processLeftBatch();
 
             if (processLeftBatchInFuture) {
+              logger.debug("Received left batch with outcome {} such that we 
have to return the current outgoing " +
+                "batch and process the new batch in subsequent next call", 
leftUpstream);
               // We should return the current output batch with OK outcome and 
don't reset the leftUpstream
               finalizeOutputContainer();
               return OK;
@@ -406,6 +414,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
             // If we have received the left batch with EMIT outcome and is 
empty then we should return previous output
             // batch with EMIT outcome
             if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+              isLeftProcessed = true;
               break;
             }
           }
@@ -435,11 +444,15 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // with OK outcome not with EMIT. Whereas if output is full and left is 
also fully consumed then we should send
     // EMIT outcome.
     if (leftUpstream == EMIT && isLeftProcessed) {
+      logger.debug("Sending current output batch with EMIT outcome since left 
is received with EMIT and is fully " +
+        "consumed in output batch");
       return EMIT;
     }
 
     if (leftUpstream == OK_NEW_SCHEMA) {
       // return output batch with OK_NEW_SCHEMA and reset the state to OK
+      logger.debug("Sending current output batch with OK_NEW_SCHEMA and 
resetting the left outcome to OK for next set" +
+        " of batches");
       leftUpstream = OK;
       return OK_NEW_SCHEMA;
     }
@@ -503,6 +516,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
    */
   private void setupNewSchema() throws SchemaChangeException {
 
+    logger.debug(String.format("Setting up new schema based on incoming batch. 
New left schema: %s" +
+        " and New right schema: %s", left.getSchema(), right.getSchema()));
+
     // Clear up the container
     container.clear();
     leftSchema = left.getSchema();
@@ -543,6 +559,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     outputIndex = 0;
     container.setRecordCount(outputIndex);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    logger.debug("Output Schema created {} based on input left schema {} and 
right schema {}", container.getSchema(),
+      leftSchema, rightSchema);
   }
 
   /**
@@ -697,6 +715,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
    * @return - final row index of output batch
    */
   private int crossJoinAndOutputRecords(final int leftIndex, final int 
rightIndex, final int outIndex) {
+    logger.trace("Producing output for leftIndex: {}, rightIndex: {}, 
rightRecordCount: {} and outputIndex: {}",
+      leftIndex, rightIndex, right.getRecordCount(), outIndex);
     final int rightRecordCount = right.getRecordCount();
     int outBatchIndex = outIndex;
 
@@ -747,6 +767,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
       final Class<?> outputValueClass = 
this.getSchema().getColumn(outputVectorIndex).getValueClass();
       final ValueVector outputVector = 
this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
 
+      logger.trace("Copying data from incoming batch vector to outgoing batch 
vector. [IncomingBatch: " +
+          "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, 
VectorType: {}) and BaseIndex: {}]",
+        fromRowIndex, inputValueClass, toRowIndex, outputValueClass, 
baseVectorIndex);
       // Copy data from input vector to output vector
       outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
     }

Reply via email to