[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1110


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-14 Thread HanumathRao
Github user HanumathRao commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168338663
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 ---
@@ -93,6 +94,21 @@ public PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws
 return creator.addMetadata(this, g);
   }
 
+  /**
+   * This method creates a new OrderedMux exchange if mux operators are 
enabled.
+   * @param child input to the new muxPrel or new SingleMergeExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) throws 
RuntimeException {
+Prel outPrel = child;
+if 
(options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val)
 {
--- End diff --

@amansinha100  Thanks for the review . I have done the needed code changes 
to fix it. Please let me know if anything is required.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168044397
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 ---
@@ -93,6 +94,21 @@ public PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws
 return creator.addMetadata(this, g);
   }
 
+  /**
+   * This method creates a new OrderedMux exchange if mux operators are 
enabled.
+   * @param child input to the new muxPrel or new SingleMergeExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) throws 
RuntimeException {
+Prel outPrel = child;
+if 
(options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val)
 {
--- End diff --

@HanumathRao I think the ordered_mux should be created when both mux and 
ordered_mux flags are enabled.  Users who disable the global 'mux' flag would 
very likely expect that all mux exchanges are disabled. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread HanumathRao
Github user HanumathRao commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168021043
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
-
-  @Override
-  public final Receiver getReceiver(int minorFragmentId) {
+  protected final List getSenders(int 
minorFragmentId) {
 createSenderReceiverMapping();
 
 List senders = 
receiverToSenderMapping.get(minorFragmentId);
 
-logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+logger.debug("Minor fragment %d, receives data from following senders 
%s", minorFragmentId, senders);
--- End diff --

Done. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167988964
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
-
-  @Override
-  public final Receiver getReceiver(int minorFragmentId) {
+  protected final List getSenders(int 
minorFragmentId) {
 createSenderReceiverMapping();
 
 List senders = 
receiverToSenderMapping.get(minorFragmentId);
 
-logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+logger.debug("Minor fragment %d, receives data from following senders 
%s", minorFragmentId, senders);
--- End diff --

use {} in place of %d and %s.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167960149
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 ---
@@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") 
PhysicalOperator child) {
   }
 
   @Override
-  public Receiver getReceiver(int minorFragmentId) {
-createSenderReceiverMapping();
-
-List senders = 
receiverToSenderMapping.get(minorFragmentId);
-if (senders == null || senders.size() <= 0) {
-  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
-}
-
+  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
+ List 
senders, boolean spooling) {
 return new UnorderedReceiver(senderMajorFragmentId, senders, false);
--- End diff --

Consider creating helper method like `getSenders()` that will return ` 
List` instead of `getReceiverInternal()`.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167954543
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 ---
@@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") 
PhysicalOperator child) {
   }
 
   @Override
-  public Receiver getReceiver(int minorFragmentId) {
-createSenderReceiverMapping();
-
-List senders = 
receiverToSenderMapping.get(minorFragmentId);
-if (senders == null || senders.size() <= 0) {
-  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
-}
-
+  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
+ List 
senders, boolean spooling) {
 return new UnorderedReceiver(senderMajorFragmentId, senders, false);
--- End diff --

`spooling` parameter is ignored, is this expected?


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167953826
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -90,6 +92,24 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
+
+  @Override
+  public final Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+
+logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
--- End diff --

Use SLF4J smart logging instead of `String.format`.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-13 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167956002
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 ---
@@ -20,133 +20,41 @@
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper {
-private final RexBuilder rexBuilder;
+  private final OptionManager options;
 
-public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-  this.rexBuilder = rexBuilder;
-}
-
-@Override
-public RexNode createCall(String funcName, List inputFields) {
-  final DrillSqlOperator op =
-  new DrillSqlOperator(funcName, inputFields.size(), true, false);
-  return rexBuilder.makeCall(op, inputFields);
+  private static boolean isMuxEnabled(OptionManager options) {
+if 
(options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val ||
--- End diff --

use `return` instead of `if`


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167586208
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
 ---
@@ -119,7 +119,7 @@ public final void setupReceivers(int majorFragmentId, 
List rec
   }
 
   @Override
-  public final  T accept(PhysicalVisitor physicalVisitor, X value) throws E {
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
--- End diff --

Is this change still required?


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167625240
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
--- End diff --

Consider moving this functionality to the parent class and keep only 
creating an instance of concrete `MergingReceiver` in the subclass.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167587159
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
--- End diff --

Use smart SLF4J logging, remove `isDebugEnabled()` check, consider moving 
logging prior to the exception.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167616026
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+}
+
+return new MergingReceiverPOP(senderMajorFragmentId, senders, 
orderings, false);
--- End diff --

I don't think that locality plays a role in enabling/disabling spooling. If 
spooling was disabled for a remote receiver, it should be also disabled for a 
local receiver.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167589289
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
 ---
@@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
 return pw;
   }
 
+  /**
+   * This method creates a new UnorderedMux and Demux exchanges if mux 
operators are enabled.
+   * @param child input to the new Unordered[Mux/Demux]Prel or new 
HashToRandomExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel getMuxPrel(Prel child, OptionManager options) {
+boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+Prel newPrel = child;
+
+final List childFields = child.getRowType().getFieldNames();
+
+List  removeUpdatedExpr = null;
+
+if (isMuxEnabled) {
+  // Insert Project Operator with new column that will be a hash for 
HashToRandomExchange fields
+  final List distFields = getFields();
+  final List outputFieldNames = 
Lists.newArrayList(childFields);
+  final RexBuilder rexBuilder = getCluster().getRexBuilder();
+  final List childRowTypeFields = 
child.getRowType().getFieldList();
+
+  final HashPrelUtil.HashExpressionCreatorHelper hashHelper = 
new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
+  final List distFieldRefs = 
Lists.newArrayListWithExpectedSize(distFields.size());
+  for(int i=0; i

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-12 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167626024
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
 ---
@@ -34,4 +37,14 @@ public ExchangePrel(RelOptCluster cluster, RelTraitSet 
traits, RelNode child) {
 return logicalVisitor.visitExchange(this, value);
   }
 
+  /**
+   * The derived classes can override this method to create relevant mux 
exchanges.
+   * If this method is not overrided the default behaviour is to clone 
itself.
+   * @param child input to the new muxPrel or new Exchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  public Prel getMuxPrel(Prel child, OptionManager options) {
--- End diff --

Consider renaming to `constructMuxPrel`.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447863
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
--- End diff --

I am not sure how the table is organized..does it have already ordered id_i 
column ? if so, we should use a different column. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447232
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 ---
@@ -20,133 +20,34 @@
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper {
-private final RexBuilder rexBuilder;
-
-public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-  this.rexBuilder = rexBuilder;
-}
-
-@Override
-public RexNode createCall(String funcName, List inputFields) {
-  final DrillSqlOperator op =
-  new DrillSqlOperator(funcName, inputFields.size(), true, false);
-  return rexBuilder.makeCall(op, inputFields);
-}
-  }
+  private final OptionManager options;
 
   public static Prel insertLocalExchanges(Prel prel, OptionManager 
options) {
 boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
 boolean isDeMuxEnabled = 
options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
 
 if (isMuxEnabled || isDeMuxEnabled) {
-  return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, 
isDeMuxEnabled), null);
+  return prel.accept(new InsertLocalExchangeVisitor(options), null);
--- End diff --

Since the local variables isMuxEnabled/disabled are not being used anymore, 
you can remove them on lines 33, 34. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448218
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+}
+
+return new MergingReceiverPOP(senderMajorFragmentId, senders, 
orderings, false);
--- End diff --

The HashToMergeExchange creates a MergingReciver with spooling TRUE, 
whereas the SingleMergeExchange creates one with spooling FALSE.  Although we 
don't test the spooling, I feel the new OrderedMuxExchange should probably have 
the same spooling setting as the HashToMergeExchange since both do the merge on 
local drill bits vs the foreman. 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-11 Thread amansinha100
Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448543
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
+
+  String explainText = client.queryBuilder().sql(sql).explainText();
+  assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
+  validateResults(client.allocator(), 
client.queryBuilder().sql(sql).results());
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for window 
functions.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForWindowAgg() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, 

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-08 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167092037
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
 ---
@@ -59,6 +59,7 @@ public static void setup() throws Exception {
 ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
 .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // 
Unmanaged
 .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) 
// Unmanaged
+.configProperty(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, 10 * 1024 
* 1024) //use less memory for sorting.
--- End diff --

Why is the change necessary?


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-08 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167089878
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 ---
@@ -55,10 +56,10 @@
 
 
   public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
+  public RETURN visitSingleMergeExchange(SingleMergeExchange exchange, 
EXTRA value) throws EXCEP;
--- End diff --

The same question as for `PrelVisitor.java`. Is it necessary to have 
separate `visitSingleMergeExchange`?


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-08 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167088606
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
 ---
@@ -35,7 +38,9 @@
   public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
-
+  public RETURN visitHashToRandomExchange(HashToRandomExchangePrel prel, 
EXTRA value) throws EXCEP;
--- End diff --

Are 3 new methods necessary? Can `visitExchange` delegate to `prel` or use 
instance of? 


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-08 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167091413
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
--- End diff --

Add debug level info for `receiverToSenderMapping` and minorFragmentId.


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-08 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167090991
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
List orderings) {
--- End diff --

Json annotation for orderings?


---


[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

2018-02-04 Thread HanumathRao
GitHub user HanumathRao opened a pull request:

https://github.com/apache/drill/pull/1110

DRILL-6115: SingleMergeExchange is not scaling up when many minor fra…

…gments are allocated for a query.

Currently a singlemerge exchange is merging all the fragment streams on 
foreman. This can cause cpu bottleneck and also huge memory consumption at the 
foreman. 

This PR contains changes to introduce a new Multiplex Operator called 
OrderedMuxExchange which merges the minor fragment streams pertaining to one 
drillbit and send as one output stream to the foreman. 

The existing multiplex mechanism is used to introduce these operators.

Please review this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HanumathRao/drill DRILL-6115

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1110


commit 43a71277aeec9bb377181728b2ce563437d7e46d
Author: hmaduri 
Date:   2018-01-22T00:42:28Z

DRILL-6115: SingleMergeExchange is not scaling up when many minor fragments 
are allocated for a query.




---