Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/1212#discussion_r182259926
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
---
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
+/**
+ * RecordBatch implementation for the lateral join operator. Currently
it's expected LATERAL to co-exists with UNNEST
+ * operator. Both LATERAL and UNNEST will share a contract with each other
defined at {@link LateralContract}
+ */
+public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
+
+ // Input indexes to correctly update the stats
+ private static final int LEFT_INPUT = 0;
+
+ private static final int RIGHT_INPUT = 1;
+
+ // Maximum number records in the outgoing batch
+ private int maxOutputRowCount;
+
+ // Schema on the left side
+ private BatchSchema leftSchema;
+
+ // Schema on the right side
+ private BatchSchema rightSchema;
+
+ // Index in output batch to populate next row
+ private int outputIndex;
+
+ // Current index of record in left incoming which is being processed
+ private int leftJoinIndex = -1;
+
+ // Current index of record in right incoming which is being processed
+ private int rightJoinIndex = -1;
+
+ // flag to keep track if current left batch needs to be processed in
future next call
+ private boolean processLeftBatchInFuture;
+
+ // Keep track if any matching right record was found for current left
index record
+ private boolean matchedRecordFound;
+
+ private boolean useMemoryManager = true;
+
+ /*
****************************************************************************************************************
+ * Public Methods
+ *
****************************************************************************************************************/
+ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext
context,
+ RecordBatch left, RecordBatch right) throws
OutOfMemoryException {
+ super(popConfig, context, left, right);
+ Preconditions.checkNotNull(left);
+ Preconditions.checkNotNull(right);
+ final int configOutputBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize,
left, right);
+
+ // Initially it's set to default value of 64K and later for each new
output row it will be set to the computed
+ // row count
+ maxOutputRowCount = batchMemoryManager.getOutputRowCount();
+ }
+
+ /**
+ * Method that get's left and right incoming batch and produce the
output batch. If the left incoming batch is
+ * empty then next on right branch is not called and empty batch with
correct outcome is returned. If non empty
+ * left incoming batch is received then it call's next on right branch
to get an incoming and finally produces
+ * output.
+ * @return IterOutcome state of the lateral join batch
+ */
+ @Override
+ public IterOutcome innerNext() {
+
+ // We don't do anything special on FIRST state. Process left batch
first and then right batch if need be
+ IterOutcome childOutcome = processLeftBatch();
+
+ // reset this state after calling processLeftBatch above.
+ processLeftBatchInFuture = false;
+
+ // If the left batch doesn't have any record in the incoming batch
(with OK_NEW_SCHEMA/EMIT) or the state returned
+ // from left side is terminal state then just return the IterOutcome
and don't call next() on right branch
+ if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
+ container.setRecordCount(0);
+ return childOutcome;
+ }
+
+ // Left side has some records in the batch so let's process right batch
+ childOutcome = processRightBatch();
+
+ // reset the left & right outcomes to OK here and send the empty batch
downstream
+ // Assumption being right side will always send OK_NEW_SCHEMA with
empty batch which is what UNNEST will do
+ if (childOutcome == OK_NEW_SCHEMA) {
+ leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
+ rightUpstream = OK;
+ return childOutcome;
+ }
+
+ if (isTerminalOutcome(childOutcome)) {
+ return childOutcome;
+ }
+
+ // If OK_NEW_SCHEMA is seen only on non empty left batch but not on
right batch, then we should setup schema in
+ // output container based on new left schema and old right schema. If
schema change failed then return STOP
+ // downstream
+ if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
+ return STOP;
+ }
+
+ // Setup the references of left, right and outgoing container in
generated operator
+ state = BatchState.NOT_FIRST;
+
+ // Update the memory manager
+ updateMemoryManager(LEFT_INPUT);
+ updateMemoryManager(RIGHT_INPUT);
+
+ // allocate space for the outgoing batch
+ allocateVectors();
+
+ return produceOutputBatch();
+ }
+
+ @Override
+ public void close() {
+ updateBatchMemoryManagerStats();
+ super.close();
+ }
+
+ @Override
+ public int getRecordCount() {
+ return container.getRecordCount();
+ }
+
+ /**
+ * Returns the left side incoming for the Lateral Join. Used by right
branch leaf operator of Lateral
+ * to process the records at leftJoinIndex.
+ *
+ * @return - RecordBatch received as left side incoming
+ */
+ @Override
+ public RecordBatch getIncoming() {
+ Preconditions.checkState (left != null, "Retuning null left batch.
It's unexpected since right side will only be " +
+ "called iff there is any valid left batch");
+ return left;
+ }
+
+ /**
+ * Returns the current row index which the calling operator should
process in current incoming left record batch.
+ * LATERAL should never return it as -1 since that indicated current
left batch is empty and LATERAL will never
+ * call next on right side with empty left batch
+ *
+ * @return - int - index of row to process.
+ */
+ @Override
+ public int getRecordIndex() {
+ Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
+ String.format("Left join index: %d is out of bounds: %d",
leftJoinIndex, left.getRecordCount()));
+ return leftJoinIndex;
+ }
+
+ /**
+ * Returns the current {@link
org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming
batch
+ */
+ @Override
+ public IterOutcome getLeftOutcome() {
+ return leftUpstream;
+ }
+
+ /*
****************************************************************************************************************
+ * Protected Methods
+ *
****************************************************************************************************************/
+
+ /**
+ * Method to get left and right batch during build schema phase for
{@link LateralJoinBatch}. If left batch sees a
+ * failure outcome then we don't even call next on right branch, since
there is no left incoming.
+ * @return true if both the left/right batch was received without
failure outcome.
+ * false if either of batch is received with failure outcome.
+ */
+ @Override
+ protected boolean prefetchFirstBatchFromBothSides() {
+ // Left can get batch with zero or more records with OK_NEW_SCHEMA
outcome as first batch
+ leftUpstream = next(0, left);
+
+ boolean validBatch = setBatchState(leftUpstream);
+
+ if (validBatch) {
+ rightUpstream = next(1, right);
+ validBatch = setBatchState(rightUpstream);
+ }
+
+ // EMIT outcome is not expected as part of first batch from either side
+ if (leftUpstream == EMIT || rightUpstream == EMIT) {
+ state = BatchState.STOP;
+ throw new IllegalStateException("Unexpected IterOutcome.EMIT
received either from left or right side in " +
+ "buildSchema phase");
+ }
+ return validBatch;
+ }
+
+ /**
+ * Prefetch a batch from left and right branch to know about the schema
of each side. Then adds value vector in
+ * output container based on those schemas. For this phase LATERAL
always expect's an empty batch from right side
+ * which UNNEST should abide by.
+ *
+ * @throws SchemaChangeException if batch schema was changed during
execution
+ */
+ @Override
+ protected void buildSchema() throws SchemaChangeException {
+ // Prefetch a RecordBatch from both left and right branch
+ if (!prefetchFirstBatchFromBothSides()) {
+ return;
+ }
+ Preconditions.checkState(right.getRecordCount() == 0, "Unexpected
non-empty first right batch received");
+
+ // Update the record memory manager
+ updateMemoryManager(LEFT_INPUT);
+ updateMemoryManager(RIGHT_INPUT);
+
+ // Setup output container schema based on known left and right schema
+ setupNewSchema();
+
+ // Release the vectors received from right side
+ VectorAccessibleUtilities.clear(right);
+
+ // Set join index as invalid (-1) if the left side is empty, else set
it to 0
+ leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
+ rightJoinIndex = -1;
+
+ // Reset the left side of the IterOutcome since for this call,
OK_NEW_SCHEMA will be returned correctly
+ // by buildSchema caller and we should treat the batch as received
with OK outcome.
+ leftUpstream = OK;
+ rightUpstream = OK;
+ }
+
+ @Override
+ protected void killIncoming(boolean sendUpstream) {
+ this.left.kill(sendUpstream);
+ // Reset the left side outcome as STOP since as part of right kill
when UNNEST will ask IterOutcome of left incoming
+ // from LATERAL and based on that it can make decision if the kill is
coming from downstream to LATERAL or upstream
+ // to LATERAL. Like LIMIT operator being present downstream to LATERAL
or upstream to LATERAL and downstream to
+ // UNNEST.
+ leftUpstream = STOP;
+ this.right.kill(sendUpstream);
+ }
+
+ /*
****************************************************************************************************************
+ * Private Methods
+ *
****************************************************************************************************************/
+
+ private boolean handleSchemaChange() {
+ try {
+ stats.startSetup();
+ logger.debug(String.format("Setting up new schema based on incoming
batch. Old output schema: %s",
+ container.getSchema()));
+ setupNewSchema();
+ return true;
+ } catch (SchemaChangeException ex) {
+ logger.error("Failed to handle schema change hence killing the
query");
+ context.getExecutorState().fail(ex);
+ left.kill(true); // Can have exchange receivers on left so called
with true
+ right.kill(false); // Won't have exchange receivers on right side
+ return false;
+ } finally {
+ stats.stopSetup();
+ }
+ }
+
+ private boolean isTerminalOutcome(IterOutcome outcome) {
+ return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome ==
NONE);
+ }
+
+ /**
+ * Process left incoming batch with different {@link
org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
+ * called from main {@link LateralJoinBatch#innerNext()} block with each
next() call from upstream operator. Also
+ * when we populate the outgoing container then this method is called to
get next left batch if current one is
+ * fully processed. It calls next() on left side until we get a
non-empty RecordBatch. OR we get either of
+ * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
+ * @return IterOutcome after processing current left batch
+ */
+ private IterOutcome processLeftBatch() {
+
+ boolean needLeftBatch = leftJoinIndex == -1;
+
+ // If left batch is empty
+ while (needLeftBatch) {
+ leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) :
leftUpstream;
+ final boolean emptyLeftBatch = left.getRecordCount() <=0;
+ logger.trace("Received a left batch and isEmpty: {}",
emptyLeftBatch);
+
+ switch (leftUpstream) {
+ case OK_NEW_SCHEMA:
+ // This OK_NEW_SCHEMA is received post build schema phase and
from left side
+ // If schema didn't actually changed then just handle it as OK
outcome. This is fine since it is not setting
+ // up any incoming vector references in setupNewSchema. While
copying the records it always work on latest
+ // incoming vector.
+ if (!isSchemaChanged(left.getSchema(), leftSchema)) {
+ logger.warn(String.format("New schema received from left side
is same as previous known left schema. " +
+ "Ignoring this schema change. Old Left Schema: %s, New Left
Schema: %s", leftSchema, left.getSchema()));
+
+ // Current left batch is empty and schema didn't changed as
well, so let's get next batch and loose
+ // OK_NEW_SCHEMA outcome
+ processLeftBatchInFuture = false;
+ if (emptyLeftBatch) {
+ continue;
+ } else {
+ leftUpstream = OK;
+ }
+ } else if (outputIndex > 0) { // can only reach here from
produceOutputBatch
+ // This means there is already some records from previous join
inside left batch
+ // So we need to pass that downstream and then handle the
OK_NEW_SCHEMA in subsequent next call
+ processLeftBatchInFuture = true;
+ return OK_NEW_SCHEMA;
+ }
+
+ // If left batch is empty with actual schema change then just
rebuild the output container and send empty
+ // batch downstream
+ if (emptyLeftBatch) {
+ if (handleSchemaChange()) {
+ leftJoinIndex = -1;
+ return OK_NEW_SCHEMA;
+ } else {
+ return STOP;
+ }
+ } // else - setup the new schema information after getting it
from right side too.
+ case OK:
+ // With OK outcome we will keep calling next until we get a
batch with >0 records
+ if (emptyLeftBatch) {
+ leftJoinIndex = -1;
+ continue;
+ } else {
+ leftJoinIndex = 0;
+ }
+ break;
+ case EMIT:
+ // don't call next on right batch
+ if (emptyLeftBatch) {
+ leftJoinIndex = -1;
+ return EMIT;
+ } else {
+ leftJoinIndex = 0;
+ }
+ break;
+ case OUT_OF_MEMORY:
+ case NONE:
+ case STOP:
+ // Not using =0 since if outgoing container is empty then no
point returning anything
+ if (outputIndex > 0) { // can only reach here from
produceOutputBatch
+ processLeftBatchInFuture = true;
+ }
+ return leftUpstream;
+ case NOT_YET:
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException ex) {
+ logger.debug("Thread interrupted while sleeping to call next
on left branch of LATERAL since it " +
+ "received NOT_YET");
+ }
+ break;
+ }
+ needLeftBatch = leftJoinIndex == -1;
+ }
+ return leftUpstream;
+ }
+
+ /**
+ * Process right incoming batch with different {@link
org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
+ * called from main {@link LateralJoinBatch#innerNext()} block with each
next() call from upstream operator and if
+ * left batch has some records in it. Also when we populate the outgoing
container then this method is called to
+ * get next right batch if current one is fully processed.
+ * @return IterOutcome after processing current left batch
+ */
+ private IterOutcome processRightBatch() {
+ // Check if we still have records left to process in left incoming
from new batch or previously half processed
+ // batch based on indexes. We are making sure to update leftJoinIndex
and rightJoinIndex correctly. Like for new
+ // batch leftJoinIndex will always be set to zero and once leftSide
batch is fully processed then it will be set
+ // to -1.
+ // Whereas rightJoinIndex is to keep track of record in right batch
being joined with record in left batch.
+ // So when there are cases such that all records in right batch is not
consumed by the output, then rightJoinIndex
+ // will be a valid index. When all records are consumed it will be set
to -1.
+ boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex ==
-1);
+ while (needNewRightBatch) {
+ rightUpstream = next(RIGHT_INPUT, right);
+ switch (rightUpstream) {
+ case OK_NEW_SCHEMA:
+ // We should not get OK_NEW_SCHEMA multiple times for the same
left incoming batch. So there won't be a
+ // case where we get OK_NEW_SCHEMA --> OK (with batch) --->
OK_NEW_SCHEMA --> OK/EMIT fall through
+ //
+ // Right batch with OK_NEW_SCHEMA is always going to be an empty
batch, so let's pass the new schema
+ // downstream and later with subsequent next() call the join
output will be produced
+ Preconditions.checkState(right.getRecordCount() == 0,
+ "Right side batch with OK_NEW_SCHEMA is not empty");
+
+ if (!isSchemaChanged(right.getSchema(), rightSchema)) {
+ logger.warn(String.format("New schema received from right side
is same as previous known right schema. " +
+ "Ignoring this schema change. Old Right schema: %s, New
Right Schema: %s",
+ rightSchema, right.getSchema()));
+ continue;
+ }
+ if (handleSchemaChange()) {
+ container.setRecordCount(0);
+ rightJoinIndex = -1;
+ return OK_NEW_SCHEMA;
+ } else {
+ return STOP;
+ }
+ case OK:
+ case EMIT:
+ // Even if there are no records we should not call next() again
because in case of LEFT join empty batch is
+ // of importance too
+ rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
+ needNewRightBatch = false;
+ break;
+ case OUT_OF_MEMORY:
+ case NONE:
+ case STOP:
+ needNewRightBatch = false;
+ break;
+ case NOT_YET:
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ logger.debug("Thread interrupted while sleeping to call next
on left branch of LATERAL since it " +
+ "received NOT_YET");
+ }
+ break;
+ }
+ }
+ return rightUpstream;
+ }
+
+ /**
+ * Get's the current left and right incoming batch and does the cross
join to fill the output batch. If all the
+ * records in the either or both the batches are consumed then it get's
next batch from that branch depending upon
+ * if output batch still has some space left. If output batch is full
then the output if finalized to be sent
+ * downstream. Subsequent call's knows how to consume previously half
consumed (if any) batches and producing the
+ * output using that.
+ *
+ * @return - IterOutcome to be send along with output batch to
downstream operator
+ */
+ private IterOutcome produceOutputBatch() {
+
+ boolean isLeftProcessed = false;
+
+ // Try to fully pack the outgoing container
+ while (!isOutgoingBatchFull()) {
+ final int previousOutputCount = outputIndex;
+ // invoke the runtime generated method to emit records in the output
batch for each leftJoinIndex
+ crossJoinAndOutputRecords();
+
+ // We have produced some records in outgoing container, hence there
must be a match found for left record
+ if (outputIndex > previousOutputCount) {
+ // Need this extra flag since there can be left join case where
for current leftJoinIndex it receives a right
+ // batch with data, then an empty batch and again another empty
batch with EMIT outcome. If we just use
+ // outputIndex then we will loose the information that few rows
for leftJoinIndex is already produced using
+ // first right batch
+ matchedRecordFound = true;
+ }
+
+ // One right batch might span across multiple output batch. So
rightIndex will be moving sum of all the
+ // output records for this record batch until it's fully consumed.
+ //
+ // Also it can be so that one output batch can contain records from
2 different right batch hence the
+ // rightJoinIndex should move by number of records in output batch
for current right batch only.
+ rightJoinIndex += outputIndex - previousOutputCount;
+ final boolean isRightProcessed = rightJoinIndex == -1 ||
rightJoinIndex >= right.getRecordCount();
+
+ // Check if above join to produce output was based on empty right
batch or
+ // it resulted in right side batch to be fully consumed. In this
scenario only if rightUpstream
+ // is EMIT then increase the leftJoinIndex.
+ // Otherwise it means for the given right batch there is still some
record left to be processed.
+ if (isRightProcessed) {
+ if (rightUpstream == EMIT) {
+ if (!matchedRecordFound && JoinRelType.LEFT ==
popConfig.getJoinType()) {
+ // copy left side in case of LEFT join
+ emitLeft(leftJoinIndex, outputIndex++);
+ }
+ ++leftJoinIndex;
+ // Reset matchedRecord for next left index record
+ matchedRecordFound = false;
+ }
+
+ // Release vectors of right batch. This will happen for both
rightUpstream = EMIT/OK
+ VectorAccessibleUtilities.clear(right);
+ rightJoinIndex = -1;
+ }
+
+ // Check if previous left record was last one, then set
leftJoinIndex to -1
+ isLeftProcessed = leftJoinIndex >= left.getRecordCount();
+ if (isLeftProcessed) {
+ leftJoinIndex = -1;
+ VectorAccessibleUtilities.clear(left);
+ }
+
+ // Check if output batch still has some space
+ if (!isOutgoingBatchFull()) {
+ // Check if left side still has records or not
+ if (isLeftProcessed) {
+ // The current left batch was with EMIT/OK_NEW_SCHEMA outcome,
then return output to downstream layer before
+ // getting next batch
+ if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
+ break;
+ } else {
+ logger.debug("Output batch still has some space left, getting
new batches from left and right");
+ // Get both left batch and the right batch and make sure
indexes are properly set
+ leftUpstream = processLeftBatch();
+
+ if (processLeftBatchInFuture) {
+ logger.debug("Received left batch with outcome {} such that
we have to return the current outgoing " +
+ "batch and process the new batch in subsequent next call",
leftUpstream);
+ // We should return the current output batch with OK outcome
and don't reset the leftUpstream
+ finalizeOutputContainer();
+ return OK;
+ }
+
+ // If left batch received a terminal outcome then don't call
right batch
+ if (isTerminalOutcome(leftUpstream)) {
+ finalizeOutputContainer();
+ return leftUpstream;
+ }
+
+ // If we have received the left batch with EMIT outcome and is
empty then we should return previous output
+ // batch with EMIT outcome
+ if (leftUpstream == EMIT && left.getRecordCount() == 0) {
+ isLeftProcessed = true;
+ break;
+ }
+
+ // Update the batch memory manager to use new left incoming
batch
+ updateMemoryManager(LEFT_INPUT);
+ }
+ }
+
+ // If we are here it means one of the below:
+ // 1) Either previous left batch was not fully processed and it
came with OK outcome. There is still some space
+ // left in outgoing batch so let's get next right batch.
+ // 2) OR previous left & right batch was fully processed and it
came with OK outcome. There is space in outgoing
+ // batch. Now we have got new left batch with OK outcome. Let's
get next right batch
+ //
+ // It will not hit OK_NEW_SCHEMA since left side have not seen
that outcome
+ rightUpstream = processRightBatch();
+ Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA,
"Unexpected schema change in right branch");
+
+ if (isTerminalOutcome(rightUpstream)) {
+ finalizeOutputContainer();
+ return rightUpstream;
+ }
+
+ // Update the batch memory manager to use new right incoming batch
+ updateMemoryManager(RIGHT_INPUT);
+ }
+ } // output batch is full to its max capacity
+
+ finalizeOutputContainer();
+
+ // Check if output batch was full and left was fully consumed or not.
Since if left is not consumed entirely
+ // but output batch is full, then if the left batch came with EMIT
outcome we should send this output batch along
+ // with OK outcome not with EMIT. Whereas if output is full and left
is also fully consumed then we should send
+ // EMIT outcome.
+ if (leftUpstream == EMIT && isLeftProcessed) {
+ logger.debug("Sending current output batch with EMIT outcome since
left is received with EMIT and is fully " +
+ "consumed in output batch");
+ return EMIT;
+ }
+
+ if (leftUpstream == OK_NEW_SCHEMA) {
+ // return output batch with OK_NEW_SCHEMA and reset the state to OK
+ logger.debug("Sending current output batch with OK_NEW_SCHEMA and
resetting the left outcome to OK for next set" +
+ " of batches");
+ leftUpstream = OK;
+ return OK_NEW_SCHEMA;
+ }
+ return OK;
+ }
+
+ /**
+ * Finalizes the current output container with the records produced so
far before sending it downstream
+ */
+ private void finalizeOutputContainer() {
+ VectorAccessibleUtilities.setValueCount(container, outputIndex);
+
+ // Set the record count in the container
+ container.setRecordCount(outputIndex);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ batchMemoryManager.updateOutgoingStats(outputIndex);
+ logger.debug("Number of records emitted: " + outputIndex);
+
+ // Update the output index for next output batch to zero
+ outputIndex = 0;
+ }
+
+ /**
+ * Check if the schema changed between provided newSchema and oldSchema.
It relies on
+ * {@link BatchSchema#isEquivalent(BatchSchema)}.
+ * @param newSchema - New Schema information
+ * @param oldSchema - - New Schema information to compare with
+ *
+ * @return - true - if newSchema is not same as oldSchema
+ * - false - if newSchema is same as oldSchema
+ */
+ private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema
oldSchema) {
+ return (newSchema == null || oldSchema == null) ||
!newSchema.isEquivalent(oldSchema);
+ }
+
+ /**
+ * Validate if the input schema is not null and doesn't contain any
Selection Vector.
+ * @param schema - input schema to verify
+ * @return - true: valid input schema
+ * false: invalid input schema
+ */
+ private boolean verifyInputSchema(BatchSchema schema) {
+
+ boolean isValid = true;
+ if (schema == null) {
+ logger.error("Null schema found for the incoming batch");
+ isValid = false;
+ } else {
+ final BatchSchema.SelectionVectorMode svMode =
schema.getSelectionVectorMode();
+ if (svMode != BatchSchema.SelectionVectorMode.NONE) {
+ logger.error("Incoming batch schema found with selection vector
which is not supported. SVMode: {}",
+ svMode.toString());
+ isValid = false;
+ }
+ }
+ return isValid;
+ }
+
+ /**
+ * Helps to create the outgoing container vectors based on known left
and right batch schemas
+ * @throws SchemaChangeException
+ */
+ private void setupNewSchema() throws SchemaChangeException {
+
+ logger.debug(String.format("Setting up new schema based on incoming
batch. New left schema: %s" +
+ " and New right schema: %s", left.getSchema(), right.getSchema()));
+
+ // Clear up the container
+ container.clear();
+ leftSchema = left.getSchema();
+ rightSchema = right.getSchema();
+
+ if (!verifyInputSchema(leftSchema)) {
+ throw new SchemaChangeException("Invalid Schema found for left
incoming batch");
+ }
+
+ if (!verifyInputSchema(rightSchema)) {
+ throw new SchemaChangeException("Invalid Schema found for right
incoming batch");
+ }
+
--- End diff --
In case of other joins planner takes care of renaming columns which has
same name from left and right side of join. I will expect same thing to happen
in Lateral Join as well rather than explicit check here.
---