DRILL-376: Create separate DataTunnel for receiving fragment Signed-off-by: Jacques Nadeau <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ad3ac806 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ad3ac806 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ad3ac806 Branch: refs/heads/master Commit: ad3ac8063a8c857db02dfdb5ca68672752f4a7c8 Parents: 08bc16d Author: Steven Phillips <[email protected]> Authored: Wed Feb 12 16:42:13 2014 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Mon Mar 3 23:22:17 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/drill/exec/ops/FragmentContext.java | 10 +++++----- .../exec/physical/impl/TestHashToRandomExchange.java | 2 +- exec/java-exec/src/test/resources/drill-module.conf | 4 ++-- .../src/test/resources/sender/hash_exchange.json | 10 ++++++++++ 4 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ad3ac806/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 80a7819..630355e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -52,7 +52,7 @@ public class FragmentContext implements Closeable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap(); + private Map<FragmentHandle, DataTunnel> tunnels = Maps.newHashMap(); private final DrillbitContext context; private final UserClientConnection connection; @@ -154,10 +154,10 @@ public class FragmentContext implements Closeable { } public DataTunnel getDataTunnel(DrillbitEndpoint endpoint, FragmentHandle remoteHandle) { - DataTunnel tunnel = tunnels.get(endpoint); + DataTunnel tunnel = tunnels.get(remoteHandle); if (tunnel == null) { tunnel = context.getDataConnectionsPool().getTunnel(endpoint, remoteHandle); - tunnels.put(endpoint, tunnel); + tunnels.put(remoteHandle, tunnel); } return tunnel; } @@ -165,8 +165,8 @@ public class FragmentContext implements Closeable { /** * Add a new thread to this fragment's context. This thread will likely run for the life of the fragment but should be * terminated when the fragment completes. When the fragment completes, the threads will be interrupted. - * - * @param runnable + * + * @param thread */ public void addDaemonThread(Thread thread) { daemonThreads.add(thread); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ad3ac806/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java index bbe1c18..d20928d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java @@ -57,7 +57,7 @@ public class TestHashToRandomExchange extends PopUnitTestBase { if (b.getHeader().getRowCount() != 0) count += b.getHeader().getRowCount(); } - assertEquals(100, count); + assertEquals(200, count); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ad3ac806/exec/java-exec/src/test/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/drill-module.conf b/exec/java-exec/src/test/resources/drill-module.conf index 5803610..56d1c16 100644 --- a/exec/java-exec/src/test/resources/drill-module.conf +++ b/exec/java-exec/src/test/resources/drill-module.conf @@ -56,8 +56,8 @@ drill.exec: { start: 35000 }, work: { - max.width.per.endpoint: 1, + max.width.per.endpoint: 2, global.max.width: 100, - executor.threads: 1 + executor.threads: 4 } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ad3ac806/exec/java-exec/src/test/resources/sender/hash_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/hash_exchange.json b/exec/java-exec/src/test/resources/sender/hash_exchange.json index d8f9579..fc45ec1 100644 --- a/exec/java-exec/src/test/resources/sender/hash_exchange.json +++ b/exec/java-exec/src/test/resources/sender/hash_exchange.json @@ -21,6 +21,16 @@ {name: "blue", type: "INT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"}, {name: "green", type: "VARBINARY", mode: "REQUIRED"} + ]}, + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "VARBINARY", mode: "REQUIRED"} + ]}, + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "VARBINARY", mode: "REQUIRED"} ]} ] },
