DRILL-6323: Lateral Join - Add some debug logs for LateralJoinBatch
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b6f10e4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b6f10e4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b6f10e4 Branch: refs/heads/master Commit: 4b6f10e401b3e17fe84e27fc740014d7e113bcd2 Parents: 5ed3196 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Tue Mar 13 16:41:03 2018 -0700 Committer: Parth Chandra <par...@apache.org> Committed: Tue Apr 17 18:16:02 2018 -0700 ---------------------------------------------------------------------- .../physical/impl/join/LateralJoinBatch.java | 49 ++++++++++++++------ 1 file changed, 36 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4b6f10e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index f01bf1c..70ac11b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -89,6 +89,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> private boolean handleSchemaChange() { try { stats.startSetup(); + logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s", + container.getSchema())); setupNewSchema(); return true; } catch (SchemaChangeException ex) { @@ -122,14 +124,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> while (needLeftBatch) { leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream; final boolean emptyLeftBatch = left.getRecordCount() <=0; + logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch); switch (leftUpstream) { case OK_NEW_SCHEMA: // This OK_NEW_SCHEMA is received post build schema phase and from left side - // If schema didn't actually changed then just handle it as OK outcome + // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting + // up any incoming vector references in setupNewSchema. While copying the records it always work on latest + // incoming vector. if (!isSchemaChanged(left.getSchema(), leftSchema)) { - logger.warn("New schema received from left side is same as previous known left schema. Ignoring this " + - "schema change"); + logger.warn(String.format("New schema received from left side is same as previous known left schema. " + + "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema())); // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose // OK_NEW_SCHEMA outcome @@ -139,7 +144,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } else { leftUpstream = OK; } - } else if (outputIndex > 0) { + } else if (outputIndex > 0) { // can only reach here from produceOutputBatch // This means there is already some records from previous join inside left batch // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call processLeftBatchInFuture = true; @@ -150,7 +155,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // batch downstream if (emptyLeftBatch) { if (handleSchemaChange()) { - //container.setRecordCount(0); leftJoinIndex = -1; return OK_NEW_SCHEMA; } else { @@ -170,7 +174,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // don't call next on right batch if (emptyLeftBatch) { leftJoinIndex = -1; - //container.setRecordCount(0); return EMIT; } else { leftJoinIndex = 0; @@ -180,7 +183,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> case NONE: case STOP: // Not using =0 since if outgoing container is empty then no point returning anything - if (outputIndex > 0) { + if (outputIndex > 0) { // can only reach here from produceOutputBatch processLeftBatchInFuture = true; } return leftUpstream; @@ -227,7 +230,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> "Right side batch with OK_NEW_SCHEMA is not empty"); if (!isSchemaChanged(right.getSchema(), rightSchema)) { - logger.warn("New schema received from right side is same as previous known right schema. Ignoring this " + "schema change"); + logger.warn(String.format("New schema received from right side is same as previous known right schema. " + + "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s", + rightSchema, right.getSchema())); continue; } if (handleSchemaChange()) { @@ -278,10 +283,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // reset this state after calling processLeftBatch above. processLeftBatchInFuture = false; - // If the left batch doesn't have any record in the incoming batch or the state returned from - // left side is terminal state then just return the IterOutcome and don't call next() on - // right branch - if (left.getRecordCount() == 0 || isTerminalOutcome(childOutcome)) { + // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned + // from left side is terminal state then just return the IterOutcome and don't call next() on right branch + if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) { container.setRecordCount(0); return childOutcome; } @@ -384,14 +388,18 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> if (outputIndex < MAX_BATCH_SIZE) { // Check if left side still has records or not if (isLeftProcessed) { - // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer + // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before + // getting next batch if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) { break; } else { + logger.debug("Output batch still has some space left, getting new batches from left and right"); // Get both left batch and the right batch and make sure indexes are properly set leftUpstream = processLeftBatch(); if (processLeftBatchInFuture) { + logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " + + "batch and process the new batch in subsequent next call", leftUpstream); // We should return the current output batch with OK outcome and don't reset the leftUpstream finalizeOutputContainer(); return OK; @@ -406,6 +414,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // If we have received the left batch with EMIT outcome and is empty then we should return previous output // batch with EMIT outcome if (leftUpstream == EMIT && left.getRecordCount() == 0) { + isLeftProcessed = true; break; } } @@ -435,11 +444,15 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send // EMIT outcome. if (leftUpstream == EMIT && isLeftProcessed) { + logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " + + "consumed in output batch"); return EMIT; } if (leftUpstream == OK_NEW_SCHEMA) { // return output batch with OK_NEW_SCHEMA and reset the state to OK + logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" + + " of batches"); leftUpstream = OK; return OK_NEW_SCHEMA; } @@ -503,6 +516,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> */ private void setupNewSchema() throws SchemaChangeException { + logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" + + " and New right schema: %s", left.getSchema(), right.getSchema())); + // Clear up the container container.clear(); leftSchema = left.getSchema(); @@ -543,6 +559,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> outputIndex = 0; container.setRecordCount(outputIndex); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + logger.debug("Output Schema created {} based on input left schema {} and right schema {}", container.getSchema(), + leftSchema, rightSchema); } /** @@ -697,6 +715,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> * @return - final row index of output batch */ private int crossJoinAndOutputRecords(final int leftIndex, final int rightIndex, final int outIndex) { + logger.trace("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {} and outputIndex: {}", + leftIndex, rightIndex, right.getRecordCount(), outIndex); final int rightRecordCount = right.getRecordCount(); int outBatchIndex = outIndex; @@ -747,6 +767,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass(); final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector(); + logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " + + "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and BaseIndex: {}]", + fromRowIndex, inputValueClass, toRowIndex, outputValueClass, baseVectorIndex); // Copy data from input vector to output vector outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex); }