DRILL-1022: Increase default min hash table size and allow setting min/max size for hash table.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ff39fb83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ff39fb83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ff39fb83 Branch: refs/heads/master Commit: ff39fb8383e038aadbf4810a6b4ad5f22d25a181 Parents: 4243f54 Author: Aman Sinha <[email protected]> Authored: Tue Jun 17 22:41:14 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 18 21:50:09 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/drill/exec/ExecConstants.java | 9 +++++++++ .../drill/exec/physical/config/HashAggregate.java | 17 ----------------- .../exec/physical/impl/aggregate/HashAggBatch.java | 10 +++++++++- .../physical/impl/aggregate/HashAggTemplate.java | 5 +++-- .../physical/impl/aggregate/HashAggregator.java | 3 ++- .../drill/exec/physical/impl/common/HashTable.java | 2 +- .../exec/physical/impl/join/HashJoinBatch.java | 5 ++++- .../exec/server/options/SystemOptionManager.java | 5 +++-- .../exec/physical/impl/join/TestHashJoin.java | 13 +++++++++++-- 9 files changed, 42 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 6673c4c..7681dd5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec; +import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.server.options.OptionValidator; import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator; @@ -86,6 +87,14 @@ public interface ExecConstants { public static final String SLICE_TARGET = "planner.slice_target"; public static final OptionValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, 1000000); + + /** + * HashTable runtime settings + */ + public static final String MIN_HASH_TABLE_SIZE_KEY = "exec.min_hash_table_size"; + public static final OptionValidator MIN_HASH_TABLE_SIZE = new PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.DEFAULT_INITIAL_CAPACITY); + public static final String MAX_HASH_TABLE_SIZE_KEY = "exec.max_hash_table_size"; + public static final OptionValidator MAX_HASH_TABLE_SIZE = new PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, HashTable.MAXIMUM_CAPACITY); /** * Limits the maximum level of parallelization to this factor time the number of Drillbits http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java index e4ce5f8..694570c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java @@ -21,8 +21,6 @@ import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -39,23 +37,12 @@ public class HashAggregate extends AbstractSingle { private final float cardinality; - // configuration parameters for the hash table - private final HashTableConfig htConfig; - @JsonCreator public HashAggregate(@JsonProperty("child") PhysicalOperator child, @JsonProperty("keys") NamedExpression[] groupByExprs, @JsonProperty("exprs") NamedExpression[] aggrExprs, @JsonProperty("cardinality") float cardinality) { super(child); this.groupByExprs = groupByExprs; this.aggrExprs = aggrExprs; this.cardinality = cardinality; - - int initial_capacity = cardinality > HashTable.DEFAULT_INITIAL_CAPACITY ? - (int) cardinality : HashTable.DEFAULT_INITIAL_CAPACITY; - - this.htConfig = new HashTableConfig(initial_capacity, - HashTable.DEFAULT_LOAD_FACTOR, - groupByExprs, - null /* no probe exprs */) ; } public NamedExpression[] getGroupByExprs() { @@ -70,10 +57,6 @@ public class HashAggregate extends AbstractSingle { return cardinality; } - public HashTableConfig getHtConfig() { - return htConfig; - } - @Override public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{ return physicalVisitor.visitHashAggregate(this, value); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index dd58562..6adc304 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -25,6 +25,7 @@ import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -48,6 +49,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; @@ -220,7 +223,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { container.buildSchema(SelectionVectorMode.NONE); HashAggregator agg = context.getImplementationClass(top); - agg.setup(popConfig, context, this.stats, + HashTableConfig htConfig = new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), + HashTable.DEFAULT_LOAD_FACTOR, + popConfig.getGroupByExprs(), + null /* no probe exprs */) ; + + agg.setup(popConfig, htConfig, context, this.stats, oContext.getAllocator(), incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 72095b7..5069a2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -171,7 +171,8 @@ public abstract class HashAggTemplate implements HashAggregator { @Override - public void setup(HashAggregate hashAggrConfig, FragmentContext context, + public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, + FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, @@ -219,7 +220,7 @@ public abstract class HashAggTemplate implements HashAggregator { } } - ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), context, allocator, incoming, null /* no incoming probe */, outgoing) ; + ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing) ; this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ; batchHolders = new ArrayList<BatchHolder>(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index d14880c..b94f299 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; @@ -42,7 +43,7 @@ public interface HashAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR } - public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, + public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 429ec63..9f5d4f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -30,7 +30,7 @@ public interface HashTable { public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class); /** The initial default capacity of the hash table (in terms of number of buckets). */ - static final public int DEFAULT_INITIAL_CAPACITY = 1 << 8; + static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; /** The maximum capacity of the hash table (in terms of number of buckets). */ static final public int MAXIMUM_CAPACITY = 1 << 30; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index c43b99a..11368e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -27,6 +27,7 @@ import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -257,7 +258,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } } - HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); + HashTableConfig htConfig = + new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(), + HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); // Create the chained hash table ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 8503197..a42640f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -64,8 +64,9 @@ public class SystemOptionManager implements OptionManager{ ExecConstants.LARGE_QUEUE_SIZE, ExecConstants.QUEUE_THRESHOLD_SIZE, ExecConstants.QUEUE_TIMEOUT, - ExecConstants.SMALL_QUEUE_SIZE - + ExecConstants.SMALL_QUEUE_SIZE, + ExecConstants.MIN_HASH_TABLE_SIZE, + ExecConstants.MAX_HASH_TABLE_SIZE }; public final PStoreConfig<OptionValue> config; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index d4a86ca..e24426e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -29,6 +29,8 @@ import mockit.NonStrictExpectations; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.FileUtils; import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.cache.DistributedCache; +import org.apache.drill.exec.cache.local.LocalCache; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.TopLevelAllocator; @@ -42,7 +44,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.pop.PopUnitTestBase; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.user.QueryResultBatch; @@ -50,7 +51,10 @@ import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; -import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.SessionOptionManager; +import org.apache.drill.exec.server.options.SystemOptionManager; +import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; import org.apache.drill.exec.vector.ValueVector; import org.junit.Rule; import org.junit.Test; @@ -69,11 +73,16 @@ public class TestHashJoin extends PopUnitTestBase{ DrillConfig c = DrillConfig.create(); private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable { + final LocalPStoreProvider provider = new LocalPStoreProvider(c); + provider.start(); + final SystemOptionManager opt = new SystemOptionManager(c, provider); + opt.init(); new NonStrictExpectations(){{ bitContext.getMetrics(); result = new MetricRegistry(); bitContext.getAllocator(); result = new TopLevelAllocator(); bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); bitContext.getConfig(); result = c; + bitContext.getOptionManager(); result = opt; }}; PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
