[ 
https://issues.apache.org/jira/browse/DRILL-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16548711#comment-16548711
 ] 

ASF GitHub Bot commented on DRILL-6606:
---------------------------------------

Ben-Zvi commented on a change in pull request #1384: DRILL-6606: Fixed bug in 
HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
URL: https://github.com/apache/drill/pull/1384#discussion_r203588145
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##########
 @@ -213,86 +219,185 @@ public int getRecordCount() {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    if (! prefetchFirstBatchFromBothSides()) {
-      return;
+    // We must first get the schemas from upstream operators before we can 
build
+    // our schema.
+    boolean validSchema = sniffNewSchemas();
+
+    if (validSchema) {
+      // We are able to construct a valid schema from the upstream data.
+      // Setting the state here makes sure AbstractRecordBatch returns 
OK_NEW_SCHEMA
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      // We were not able to build a valid schema, so we need to set our 
termination state.
+      final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+      state = batchStateOpt.get(); // There should be a state here.
     }
 
+    // If we have a valid schema, this will build a valid container. If we 
were unable to obtain a valid schema,
+    // we still need to build a dummy schema. These code handles both cases 
for us.
+    setupOutputContainerSchema();
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
     // Initialize the hash join helper context
-    if (rightUpstream != IterOutcome.NONE) {
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // We only need the hash tables if we have data on the build side.
       setupHashTable();
     }
-    setupOutputContainerSchema();
+
     try {
       hashJoinProbe = setupHashJoinProbe();
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
-    leftUpstream = sniffNonEmptyBatch(0, left);
-    rightUpstream = sniffNonEmptyBatch(1, right);
+    if (leftUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+    }
+
+    buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
+
+    final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+
+    if (batchStateOpt.isPresent()) {
+      // We reached a termination state
+      state = batchStateOpt.get();
+
+      switch (state) {
+        case STOP:
+        case OUT_OF_MEMORY:
+          // Terminate processing now
+          return false;
+        case DONE:
+          // No more data but take operation to completion
+          return true;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      // For build side, use aggregate i.e. average row width across batches
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    // For build side, use aggregate i.e. average row width across batches
-    batchMemoryManager.update(LEFT_INDEX, 0);
-    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming left: {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right: {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
-    logger.debug("BATCH_STATS, incoming left: {}", 
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-    logger.debug("BATCH_STATS, incoming right: {}", 
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      // Got our first batche(s)
+      state = BatchState.FIRST;
+      return true;
+    }
+  }
 
+  /**
+   * Checks if a termination state has been reached, and returns the 
appropriate termination state if it has been reached.
+   * @return The termination state if it has been reached. Otherwise empty.
+   */
+  private Optional<BatchState> getBatchStateTermination() {
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
-      state = BatchState.STOP;
-      return false;
+      return Optional.of(BatchState.STOP);
     }
 
     if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == 
IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
+      return Optional.of(BatchState.OUT_OF_MEMORY);
     }
 
     if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
-      state = BatchState.DONE;
-      return false;
+      return Optional.of(BatchState.DONE);
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Sniffs all data necessary to construct a schema.
+   * @return True if all the data necessary to construct a schema has been 
retrieved. False otherwise.
+   */
+  private boolean sniffNewSchemas() {
+    do {
+      // Ask for data until we get a valid result.
+      leftUpstream = next(LEFT_INDEX, probeBatch);
+    } while (leftUpstream == IterOutcome.NOT_YET);
+
+    switch (leftUpstream) {
+      case OK_NEW_SCHEMA:
+        probeSchema = probeBatch.getSchema();
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building 
schema " + leftUpstream);
+      default:
+        // Termination condition
+    }
+
+    do {
+      // Ask for data until we get a valid result.
+      rightUpstream = next(RIGHT_INDEX, buildBatch);
+    } while (rightUpstream == IterOutcome.NOT_YET);
+
+    switch (rightUpstream) {
+      case OK_NEW_SCHEMA:
+        // We need to have the schema of the build side even when the build 
side is empty
+        rightSchema = buildBatch.getSchema();
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building 
schema " + leftUpstream);
+      default:
+        // Termination condition
+    }
+
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // position of the new "column" for keeping the hash values (after the 
real columns)
+      rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
     }
 
-    state = BatchState.FIRST;  // Got our first batches on both sides
-    return true;
+    return (validSchemaOutcome(leftUpstream, rightUpstream) || 
validSchemaOutcome(rightUpstream, leftUpstream));
 
 Review comment:
   Little "awkward" -- instead can just add a case for NULL in the second 
switch, and return "leftUpStream == OK_NEW_SCHEMA" , and maybe set a flag 
"validLeft" (for NONE and for OK_NEW_SCHEMA) in the first switch and just 
return it .

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Hash Join returns incorrect data types when joining subqueries with limit 0
> ---------------------------------------------------------------------------
>
>                 Key: DRILL-6606
>                 URL: https://issues.apache.org/jira/browse/DRILL-6606
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Bohdan Kazydub
>            Assignee: Timothy Farkas
>            Priority: Blocker
>             Fix For: 1.14.0
>
>
> PreparedStatement for query
> {code:sql}
> SELECT l.l_quantity, l.l_shipdate, o.o_custkey
> FROM (SELECT * FROM cp.`tpch/lineitem.parquet` LIMIT 0) l
>     JOIN (SELECT * FROM cp.`tpch/orders.parquet` LIMIT 0) o 
>     ON l.l_orderkey = o.o_orderkey
> LIMIT 0
> {code}
>  is created with wrong types (nullable INTEGER) for all selected columns, no 
> matter what their actual type is. This behavior reproduces with hash join 
> only and is very likely to be caused by DRILL-6027 as the query works fine 
> before this feature was implemented.
> To reproduce the problem you can put the aforementioned query into 
> TestPreparedStatementProvider#joinOrderByQuery() test method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to