Repository: drill
Updated Branches:
  refs/heads/master f8691f4f9 -> 1bb292072


DRILL-6322: Lateral Join: Common changes - Add new iterOutcome, Operatortypes, 
MockRecordBatch for testing

  Added new Iterator State EMIT, added operatos LATERA_JOIN & UNNEST in 
CoreOperatorType and added LateralContract interface

  Implementation of MockRecordBatch to test operator behavior for different 
IterOutcomes. a) Creates new output container for schema change cases. b) 
Doesn't create new container for each next() call without schema change, since 
the operator in test expects the ValueVector object in it's incoming batch to 
be same unless a OK_NEW_SCHEMA case is hit. Since setup() method of operator in 
test will store the reference to value vector received in first batch

This closes #1211


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1bb29207
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1bb29207
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1bb29207

Branch: refs/heads/master
Commit: 1bb292072f249bc8c4334313af8f8537c7ed1622
Parents: f8691f4
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Mon Feb 5 13:12:15 2018 -0800
Committer: Parth Chandra <par...@apache.org>
Committed: Mon Apr 16 17:10:35 2018 -0700

----------------------------------------------------------------------
 .../exec/physical/base/LateralContract.java     |  46 +++++
 .../apache/drill/exec/record/RecordBatch.java   |  27 ++-
 .../exec/physical/impl/MockRecordBatch.java     | 185 +++++++++++++++++++
 .../org/apache/drill/test/OperatorFixture.java  |  28 +--
 .../apache/drill/exec/proto/UserBitShared.java  |  31 +++-
 .../exec/proto/beans/CoreOperatorType.java      |   6 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +
 7 files changed, 306 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/LateralContract.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/LateralContract.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/LateralContract.java
new file mode 100644
index 0000000..3d6a3c5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/LateralContract.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * Contract between Lateral Join and any operator on right side of it 
consuming the input
+ * from left side.
+ */
+public interface LateralContract {
+
+  /**
+   * Get reference to left side incoming of LateralJoinRecordBatch
+   * @return
+   */
+  RecordBatch getIncoming();
+
+  /**
+   * Get current record index in incoming to be processed
+   * @return
+   */
+  int getRecordIndex();
+
+  /**
+   * Get the current outcome of left incoming batch
+   */
+  IterOutcome getLeftOutcome();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 7fc086d..fe7f9e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -198,7 +198,32 @@ public interface RecordBatch extends VectorAccessible {
      *     {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again.
      * </p>
      */
-    OUT_OF_MEMORY
+    OUT_OF_MEMORY,
+
+    /**
+     * Emit record to produce output batches.
+     * <p>
+     *   The call to {@link #next()},
+     *   read zero or more records with no change in schema as compared to last
+     *   time. It is an indication from upstream operator to unblock and
+     *   produce an output batch based on all the records current operator
+     *   possess. The caller should return this outcome to it's downstream
+     *   operators except LateralJoinRecordBatch, which will consume any EMIT
+     *   from right branch but will pass through EMIT from left branch.
+     * </p>
+     * <p>
+     *   Caller should produce one or more output record batch based on all the
+     *   current data and restart fresh for any new input. If there are 
multiple
+     *   output batches then caller should send EMIT only with last batch and 
OK
+     *   with all previous batches.
+     *   For example: Hash Join when received EMIT on build side will stop 
build
+     *   side and call next() on probe side until it sees EMIT. On seeing EMIT
+     *   from probe side, it should perform JOIN and produce output batches.
+     *   Later it should clear all the data on both build and probe side of
+     *   input and again start from build side.
+     * </p>
+     */
+    EMIT,
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
new file mode 100644
index 0000000..a16e5b8
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class MockRecordBatch implements CloseableRecordBatch {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
+
+  // These resources are owned by this RecordBatch
+  private VectorContainer container;
+  private int currentContainerIndex;
+  private int currentOutcomeIndex;
+  private boolean isDone;
+
+  // All the below resources are owned by caller
+  private final List<VectorContainer> allTestContainers;
+  private final List<IterOutcome> allOutcomes;
+  private final FragmentContext context;
+  private final OperatorContext oContext;
+  private final BufferAllocator allocator;
+
+  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+                         List<VectorContainer> testContainers, 
List<IterOutcome> iterOutcomes,
+                         BatchSchema schema) {
+    this.context = context;
+    this.oContext = oContext;
+    this.allocator = oContext.getAllocator();
+    this.allTestContainers = testContainers;
+    this.container = new VectorContainer(allocator, schema);
+    this.allOutcomes = iterOutcomes;
+    this.currentContainerIndex = 0;
+    this.currentOutcomeIndex = 0;
+    this.isDone = false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    container.clear();
+    container.setRecordCount(0);
+    currentContainerIndex = 0;
+    currentOutcomeIndex = 0;
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return null;
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    return null;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
+  }
+
+  @Override
+  public void kill(boolean sendUpstream) {
+    isDone = true;
+    container.clear();
+    container.setRecordCount(0);
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    return null;
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return container.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return container.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public IterOutcome next() {
+
+    if(isDone) {
+      return IterOutcome.NONE;
+    }
+
+    IterOutcome currentOutcome = IterOutcome.OK;
+
+    if (currentContainerIndex < allTestContainers.size()) {
+      final VectorContainer input = 
allTestContainers.get(currentContainerIndex);
+      final int recordCount = input.getRecordCount();
+      // We need to do this since the downstream operator expects vector 
reference to be same
+      // after first next call in cases when schema is not changed
+      final BatchSchema inputSchema = input.getSchema();
+      if (!container.getSchema().isEquivalent(inputSchema)) {
+        container.clear();
+        container = new VectorContainer(allocator, inputSchema);
+      }
+      container.transferIn(input);
+      container.setRecordCount(recordCount);
+    }
+
+    if (currentOutcomeIndex < allOutcomes.size()) {
+      currentOutcome = allOutcomes.get(currentOutcomeIndex);
+      ++currentOutcomeIndex;
+    } else {
+      currentOutcome = IterOutcome.NONE;
+    }
+
+    switch (currentOutcome) {
+      case OK:
+      case OK_NEW_SCHEMA:
+      case EMIT:
+        ++currentContainerIndex;
+        return currentOutcome;
+      case NONE:
+      case STOP:
+      case OUT_OF_MEMORY:
+      //case OK_NEW_SCHEMA:
+        isDone = true;
+        container.setRecordCount(0);
+        return currentOutcome;
+      case NOT_YET:
+        container.setRecordCount(0);
+        return currentOutcome;
+      default:
+        throw new UnsupportedOperationException("This state is not supported");
+    }
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException("MockRecordBatch doesn't support 
gettingWritableBatch yet");
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return container.iterator();
+  }
+
+  public boolean isCompleted() {
+    return isDone;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 3e50f75..bb63277 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.buffer.DrillBuf;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.ClassPathScanner;
@@ -66,11 +66,10 @@ import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.hadoop.security.UserGroupInformation;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import io.netty.buffer.DrillBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -157,6 +156,8 @@ public class OperatorFixture extends BaseFixture implements 
AutoCloseable {
     private final BufferAllocator allocator;
     private final ExecutorService scanExecutorService;
     private final ExecutorService scanDecodeExecutorService;
+    private final List<OperatorContext> contexts = Lists.newLinkedList();
+
 
     private ExecutorState executorState = new 
OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
@@ -251,7 +252,9 @@ public class OperatorFixture extends BaseFixture implements 
AutoCloseable {
         popConfig.getInitialAllocation(),
         popConfig.getMaxAllocation()
       );
-      return new MockOperatorContext(this, childAllocator, popConfig);
+      OperatorContext context = new MockOperatorContext(this, childAllocator, 
popConfig);
+      contexts.add(context);
+      return context;
     }
 
     @Override
@@ -286,6 +289,9 @@ public class OperatorFixture extends BaseFixture implements 
AutoCloseable {
 
     @Override
     public void close() {
+      for(OperatorContext context : contexts) {
+        context.close();
+      }
       bufferManager.close();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index b2cc57d..770ddb4 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -529,6 +529,14 @@ public final class UserBitShared {
      * <code>FLATTEN = 40;</code>
      */
     FLATTEN(40, 40),
+    /**
+     * <code>LATERAL_JOIN = 41;</code>
+     */
+    LATERAL_JOIN(41, 41),
+    /**
+     * <code>UNNEST = 42;</code>
+     */
+    UNNEST(42, 42),
     ;
 
     /**
@@ -695,6 +703,14 @@ public final class UserBitShared {
      * <code>FLATTEN = 40;</code>
      */
     public static final int FLATTEN_VALUE = 40;
+    /**
+     * <code>LATERAL_JOIN = 41;</code>
+     */
+    public static final int LATERAL_JOIN_VALUE = 41;
+    /**
+     * <code>UNNEST = 42;</code>
+     */
+    public static final int UNNEST_VALUE = 42;
 
 
     public final int getNumber() { return value; }
@@ -742,6 +758,8 @@ public final class UserBitShared {
         case 38: return KAFKA_SUB_SCAN;
         case 39: return KUDU_SUB_SCAN;
         case 40: return FLATTEN;
+        case 41: return LATERAL_JOIN;
+        case 42: return UNNEST;
         default: return null;
       }
     }
@@ -24122,7 +24140,7 @@ public final class UserBitShared {
       "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" +
       
"OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t"
 +
       "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" 
+
-      "REQUESTED\020\006*\244\006\n\020CoreOperatorType\022\021\n\rSING" +
+      "REQUESTED\020\006*\302\006\n\020CoreOperatorType\022\021\n\rSING" +
       "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" 
+
       
"TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" +
       "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" +
@@ -24142,11 +24160,12 @@ public final class UserBitShared {
       "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" +
       "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" +
       "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" +
-      "CAN\020\'\022\013\n\007FLATTEN\020(*g\n\nSaslStatus\022\020\n\014SASL" +
-      "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" 
+
-      
"OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" +
-      "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" +
-      "BitSharedH\001"
+      
"CAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006"
 +
+      "UNNEST\020**g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000" +
+      
"\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020" +
+      "\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org" 
+
+      ".apache.drill.exec.protoB\rUserBitSharedH" +
+      "\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index dc3f158..e7b897d 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -62,7 +62,9 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     PCAP_SUB_SCAN(37),
     KAFKA_SUB_SCAN(38),
     KUDU_SUB_SCAN(39),
-    FLATTEN(40);
+    FLATTEN(40),
+    LATERAL_JOIN(41),
+    UNNEST(42);
     
     public final int number;
     
@@ -121,6 +123,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 38: return KAFKA_SUB_SCAN;
             case 39: return KUDU_SUB_SCAN;
             case 40: return FLATTEN;
+            case 41: return LATERAL_JOIN;
+            case 42: return UNNEST;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1bb29207/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index d4c401d..0e10d0d 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -328,6 +328,8 @@ enum CoreOperatorType {
   KAFKA_SUB_SCAN = 38;
   KUDU_SUB_SCAN = 39;
   FLATTEN = 40;
+  LATERAL_JOIN = 41;
+  UNNEST = 42;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of 
function signatures.

Reply via email to