Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/1212#discussion_r182172836 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java --- @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.record.AbstractBinaryRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.JoinBatchMemoryManager; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.VectorAccessibleUtilities; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; + +/** + * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST + * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract} + */ +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class); + + // Input indexes to correctly update the stats + private static final int LEFT_INPUT = 0; + + private static final int RIGHT_INPUT = 1; + + // Maximum number records in the outgoing batch + private int maxOutputRowCount; + + // Schema on the left side + private BatchSchema leftSchema; + + // Schema on the right side + private BatchSchema rightSchema; + + // Index in output batch to populate next row + private int outputIndex; + + // Current index of record in left incoming which is being processed + private int leftJoinIndex = -1; + + // Current index of record in right incoming which is being processed + private int rightJoinIndex = -1; + + // flag to keep track if current left batch needs to be processed in future next call + private boolean processLeftBatchInFuture; + + // Keep track if any matching right record was found for current left index record + private boolean matchedRecordFound; + + private boolean useMemoryManager = true; + + /* **************************************************************************************************************** + * Public Methods + * ****************************************************************************************************************/ + public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, + RecordBatch left, RecordBatch right) throws OutOfMemoryException { + super(popConfig, context, left, right); + Preconditions.checkNotNull(left); + Preconditions.checkNotNull(right); + final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right); + + // Initially it's set to default value of 64K and later for each new output row it will be set to the computed + // row count + maxOutputRowCount = batchMemoryManager.getOutputRowCount(); + } + + /** + * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is + * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty + * left incoming batch is received then it call's next on right branch to get an incoming and finally produces + * output. + * @return IterOutcome state of the lateral join batch + */ + @Override + public IterOutcome innerNext() { + + // We don't do anything special on FIRST state. Process left batch first and then right batch if need be + IterOutcome childOutcome = processLeftBatch(); + + // reset this state after calling processLeftBatch above. + processLeftBatchInFuture = false; + + // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned + // from left side is terminal state then just return the IterOutcome and don't call next() on right branch + if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) { + container.setRecordCount(0); + return childOutcome; + } + + // Left side has some records in the batch so let's process right batch + childOutcome = processRightBatch(); + + // reset the left & right outcomes to OK here and send the empty batch downstream + // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do + if (childOutcome == OK_NEW_SCHEMA) { + leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; + rightUpstream = OK; + return childOutcome; + } + + if (isTerminalOutcome(childOutcome)) { + return childOutcome; + } + + // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in + // output container based on new left schema and old right schema. If schema change failed then return STOP + // downstream + if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { + return STOP; + } + + // Setup the references of left, right and outgoing container in generated operator + state = BatchState.NOT_FIRST; + + // Update the memory manager + updateMemoryManager(LEFT_INPUT); + updateMemoryManager(RIGHT_INPUT); + + // allocate space for the outgoing batch + allocateVectors(); + + return produceOutputBatch(); + } + + @Override + public void close() { + updateBatchMemoryManagerStats(); + super.close(); + } + + @Override + public int getRecordCount() { + return container.getRecordCount(); + } + + /** + * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral + * to process the records at leftJoinIndex. + * + * @return - RecordBatch received as left side incoming + */ + @Override + public RecordBatch getIncoming() { + Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " + + "called iff there is any valid left batch"); + return left; + } + + /** + * Returns the current row index which the calling operator should process in current incoming left record batch. + * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never + * call next on right side with empty left batch + * + * @return - int - index of row to process. + */ + @Override + public int getRecordIndex() { + Preconditions.checkState (leftJoinIndex < left.getRecordCount(), + String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount())); + return leftJoinIndex; + } + + /** + * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch + */ + @Override + public IterOutcome getLeftOutcome() { + return leftUpstream; + } + + /* **************************************************************************************************************** + * Protected Methods + * ****************************************************************************************************************/ + + /** + * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a + * failure outcome then we don't even call next on right branch, since there is no left incoming. + * @return true if both the left/right batch was received without failure outcome. + * false if either of batch is received with failure outcome. + */ + @Override + protected boolean prefetchFirstBatchFromBothSides() { + // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch + leftUpstream = next(0, left); + + boolean validBatch = setBatchState(leftUpstream); + + if (validBatch) { + rightUpstream = next(1, right); + validBatch = setBatchState(rightUpstream); + } + + // EMIT outcome is not expected as part of first batch from either side + if (leftUpstream == EMIT || rightUpstream == EMIT) { + state = BatchState.STOP; + throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + + "buildSchema phase"); + } + return validBatch; + } + + /** + * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in + * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side + * which UNNEST should abide by. + * + * @throws SchemaChangeException if batch schema was changed during execution + */ + @Override + protected void buildSchema() throws SchemaChangeException { + // Prefetch a RecordBatch from both left and right branch + if (!prefetchFirstBatchFromBothSides()) { + return; + } + Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received"); + + // Update the record memory manager + updateMemoryManager(LEFT_INPUT); + updateMemoryManager(RIGHT_INPUT); + + // Setup output container schema based on known left and right schema + setupNewSchema(); + + // Release the vectors received from right side + VectorAccessibleUtilities.clear(right); + + // Set join index as invalid (-1) if the left side is empty, else set it to 0 + leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; + rightJoinIndex = -1; + + // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly + // by buildSchema caller and we should treat the batch as received with OK outcome. + leftUpstream = OK; + rightUpstream = OK; + } + + @Override + protected void killIncoming(boolean sendUpstream) { + this.left.kill(sendUpstream); + // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming + // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream + // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to + // UNNEST. + leftUpstream = STOP; + this.right.kill(sendUpstream); + } + + /* **************************************************************************************************************** + * Private Methods + * ****************************************************************************************************************/ + + private boolean handleSchemaChange() { + try { + stats.startSetup(); + logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s", + container.getSchema())); + setupNewSchema(); + return true; + } catch (SchemaChangeException ex) { + logger.error("Failed to handle schema change hence killing the query"); + context.getExecutorState().fail(ex); + left.kill(true); // Can have exchange receivers on left so called with true + right.kill(false); // Won't have exchange receivers on right side + return false; + } finally { + stats.stopSetup(); + } + } + + private boolean isTerminalOutcome(IterOutcome outcome) { + return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE); + } + + /** + * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is + * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also + * when we populate the outgoing container then this method is called to get next left batch if current one is + * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of + * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome. + * @return IterOutcome after processing current left batch + */ + private IterOutcome processLeftBatch() { + + boolean needLeftBatch = leftJoinIndex == -1; + + // If left batch is empty + while (needLeftBatch) { + leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream; + final boolean emptyLeftBatch = left.getRecordCount() <=0; + logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch); + + switch (leftUpstream) { + case OK_NEW_SCHEMA: + // This OK_NEW_SCHEMA is received post build schema phase and from left side + // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting + // up any incoming vector references in setupNewSchema. While copying the records it always work on latest + // incoming vector. + if (!isSchemaChanged(left.getSchema(), leftSchema)) { + logger.warn(String.format("New schema received from left side is same as previous known left schema. " + + "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema())); + + // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose + // OK_NEW_SCHEMA outcome + processLeftBatchInFuture = false; + if (emptyLeftBatch) { + continue; + } else { + leftUpstream = OK; + } + } else if (outputIndex > 0) { // can only reach here from produceOutputBatch + // This means there is already some records from previous join inside left batch + // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call + processLeftBatchInFuture = true; + return OK_NEW_SCHEMA; + } + + // If left batch is empty with actual schema change then just rebuild the output container and send empty + // batch downstream + if (emptyLeftBatch) { + if (handleSchemaChange()) { + leftJoinIndex = -1; + return OK_NEW_SCHEMA; + } else { + return STOP; + } + } // else - setup the new schema information after getting it from right side too. + case OK: + // With OK outcome we will keep calling next until we get a batch with >0 records + if (emptyLeftBatch) { + leftJoinIndex = -1; + continue; + } else { + leftJoinIndex = 0; + } + break; + case EMIT: + // don't call next on right batch + if (emptyLeftBatch) { + leftJoinIndex = -1; + return EMIT; + } else { + leftJoinIndex = 0; + } + break; + case OUT_OF_MEMORY: + case NONE: + case STOP: + // Not using =0 since if outgoing container is empty then no point returning anything + if (outputIndex > 0) { // can only reach here from produceOutputBatch + processLeftBatchInFuture = true; + } + return leftUpstream; + case NOT_YET: + try { + Thread.sleep(5); + } catch (InterruptedException ex) { + logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + + "received NOT_YET"); + } + break; + } + needLeftBatch = leftJoinIndex == -1; + } + return leftUpstream; + } + + /** + * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is + * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if + * left batch has some records in it. Also when we populate the outgoing container then this method is called to + * get next right batch if current one is fully processed. + * @return IterOutcome after processing current left batch + */ + private IterOutcome processRightBatch() { + // Check if we still have records left to process in left incoming from new batch or previously half processed + // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new + // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set + // to -1. + // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch. + // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex + // will be a valid index. When all records are consumed it will be set to -1. + boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1); + while (needNewRightBatch) { + rightUpstream = next(RIGHT_INPUT, right); + switch (rightUpstream) { + case OK_NEW_SCHEMA: + // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a + // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through + // + // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema + // downstream and later with subsequent next() call the join output will be produced + Preconditions.checkState(right.getRecordCount() == 0, + "Right side batch with OK_NEW_SCHEMA is not empty"); + + if (!isSchemaChanged(right.getSchema(), rightSchema)) { + logger.warn(String.format("New schema received from right side is same as previous known right schema. " + + "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s", + rightSchema, right.getSchema())); + continue; + } + if (handleSchemaChange()) { + container.setRecordCount(0); + rightJoinIndex = -1; + return OK_NEW_SCHEMA; + } else { + return STOP; + } + case OK: + case EMIT: + // Even if there are no records we should not call next() again because in case of LEFT join empty batch is + // of importance too + rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; + needNewRightBatch = false; + break; + case OUT_OF_MEMORY: + case NONE: + case STOP: + needNewRightBatch = false; + break; + case NOT_YET: + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + + "received NOT_YET"); + } + break; + } + } + return rightUpstream; + } + + /** + * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the + * records in the either or both the batches are consumed then it get's next batch from that branch depending upon + * if output batch still has some space left. If output batch is full then the output if finalized to be sent + * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the + * output using that. + * + * @return - IterOutcome to be send along with output batch to downstream operator + */ + private IterOutcome produceOutputBatch() { + + boolean isLeftProcessed = false; + + // Try to fully pack the outgoing container + while (!isOutgoingBatchFull()) { + final int previousOutputCount = outputIndex; + // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex + crossJoinAndOutputRecords(); + + // We have produced some records in outgoing container, hence there must be a match found for left record + if (outputIndex > previousOutputCount) { + // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right + // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use + // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using + // first right batch + matchedRecordFound = true; + } + + // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the + // output records for this record batch until it's fully consumed. + // + // Also it can be so that one output batch can contain records from 2 different right batch hence the + // rightJoinIndex should move by number of records in output batch for current right batch only. + rightJoinIndex += outputIndex - previousOutputCount; + final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount(); + + // Check if above join to produce output was based on empty right batch or + // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream + // is EMIT then increase the leftJoinIndex. + // Otherwise it means for the given right batch there is still some record left to be processed. + if (isRightProcessed) { + if (rightUpstream == EMIT) { + if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) { + // copy left side in case of LEFT join + emitLeft(leftJoinIndex, outputIndex++); + } + ++leftJoinIndex; + // Reset matchedRecord for next left index record + matchedRecordFound = false; + } + + // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK + VectorAccessibleUtilities.clear(right); + rightJoinIndex = -1; + } + + // Check if previous left record was last one, then set leftJoinIndex to -1 + isLeftProcessed = leftJoinIndex >= left.getRecordCount(); + if (isLeftProcessed) { + leftJoinIndex = -1; + VectorAccessibleUtilities.clear(left); + } + + // Check if output batch still has some space + if (!isOutgoingBatchFull()) { + // Check if left side still has records or not + if (isLeftProcessed) { + // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before + // getting next batch + if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) { + break; + } else { + logger.debug("Output batch still has some space left, getting new batches from left and right"); + // Get both left batch and the right batch and make sure indexes are properly set + leftUpstream = processLeftBatch(); + + if (processLeftBatchInFuture) { + logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " + + "batch and process the new batch in subsequent next call", leftUpstream); + // We should return the current output batch with OK outcome and don't reset the leftUpstream + finalizeOutputContainer(); + return OK; + } + + // If left batch received a terminal outcome then don't call right batch + if (isTerminalOutcome(leftUpstream)) { + finalizeOutputContainer(); + return leftUpstream; + } + + // If we have received the left batch with EMIT outcome and is empty then we should return previous output + // batch with EMIT outcome + if (leftUpstream == EMIT && left.getRecordCount() == 0) { + isLeftProcessed = true; + break; + } + + // Update the batch memory manager to use new left incoming batch + updateMemoryManager(LEFT_INPUT); + } + } + + // If we are here it means one of the below: + // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space + // left in outgoing batch so let's get next right batch. + // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing + // batch. Now we have got new left batch with OK outcome. Let's get next right batch + // + // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome + rightUpstream = processRightBatch(); + Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch"); + + if (isTerminalOutcome(rightUpstream)) { + finalizeOutputContainer(); + return rightUpstream; + } + + // Update the batch memory manager to use new right incoming batch + updateMemoryManager(RIGHT_INPUT); + } + } // output batch is full to its max capacity + + finalizeOutputContainer(); + + // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely + // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along + // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send + // EMIT outcome. + if (leftUpstream == EMIT && isLeftProcessed) { + logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " + + "consumed in output batch"); + return EMIT; + } + + if (leftUpstream == OK_NEW_SCHEMA) { + // return output batch with OK_NEW_SCHEMA and reset the state to OK + logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" + + " of batches"); + leftUpstream = OK; + return OK_NEW_SCHEMA; + } + return OK; + } + + /** + * Finalizes the current output container with the records produced so far before sending it downstream + */ + private void finalizeOutputContainer() { + VectorAccessibleUtilities.setValueCount(container, outputIndex); + + // Set the record count in the container + container.setRecordCount(outputIndex); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + batchMemoryManager.updateOutgoingStats(outputIndex); + logger.debug("Number of records emitted: " + outputIndex); + + // Update the output index for next output batch to zero + outputIndex = 0; + } + + /** + * Check if the schema changed between provided newSchema and oldSchema. It relies on + * {@link BatchSchema#isEquivalent(BatchSchema)}. + * @param newSchema - New Schema information + * @param oldSchema - - New Schema information to compare with + * + * @return - true - if newSchema is not same as oldSchema + * - false - if newSchema is same as oldSchema + */ + private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) { + return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema); + } + + /** + * Validate if the input schema is not null and doesn't contain any Selection Vector. + * @param schema - input schema to verify + * @return - true: valid input schema + * false: invalid input schema + */ + private boolean verifyInputSchema(BatchSchema schema) { + + boolean isValid = true; + if (schema == null) { + logger.error("Null schema found for the incoming batch"); + isValid = false; + } else { + final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode(); + if (svMode != BatchSchema.SelectionVectorMode.NONE) { + logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}", + svMode.toString()); + isValid = false; + } + } + return isValid; + } + + /** + * Helps to create the outgoing container vectors based on known left and right batch schemas + * @throws SchemaChangeException + */ + private void setupNewSchema() throws SchemaChangeException { + + logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" + + " and New right schema: %s", left.getSchema(), right.getSchema())); + + // Clear up the container + container.clear(); + leftSchema = left.getSchema(); + rightSchema = right.getSchema(); + + if (!verifyInputSchema(leftSchema)) { + throw new SchemaChangeException("Invalid Schema found for left incoming batch"); + } + + if (!verifyInputSchema(rightSchema)) { + throw new SchemaChangeException("Invalid Schema found for right incoming batch"); + } + --- End diff -- Should we add a check here to guard that there is no duplication of names between right and left inputs?
---