This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ae25994ef40dfa3724a47fc804b83ca3c5128851 Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Thu Apr 2 11:45:12 2020 -0700 DRILL-7675: Work around for partitions sender memory use Adds an ad-hoc system/session option to limit partition sender memory use. See DRILL-7686 for the underlying issue. Also includes code cleanup and diagnostic tools. closes #2047 --- .../java/org/apache/drill/exec/ExecConstants.java | 7 +- .../apache/drill/exec/compile/ClassBuilder.java | 13 +--- .../exec/physical/impl/join/HashJoinBatch.java | 9 +-- .../physical/impl/partitionsender/Partitioner.java | 5 +- .../impl/partitionsender/PartitionerTemplate.java | 68 ++++++++++++++------ .../org/apache/drill/exec/record/BatchSchema.java | 30 +++++---- .../apache/drill/exec/record/VectorContainer.java | 14 ++++ .../exec/server/options/BaseOptionManager.java | 5 ++ .../drill/exec/server/options/OptionSet.java | 14 ++-- .../exec/server/options/SystemOptionManager.java | 1 + .../org/apache/drill/exec/vector/CopyUtil.java | 26 ++++---- .../apache/drill/exec/work/foreman/Foreman.java | 31 +++++---- .../drill/exec/work/foreman/QueryManager.java | 23 +++---- .../java-exec/src/main/resources/drill-module.conf | 1 + .../java/org/apache/drill/test/ClusterFixture.java | 35 ++++------ .../apache/drill/test/ClusterFixtureBuilder.java | 2 - .../drill/exec/record/MaterializedField.java | 75 +++++++++++++++------- 17 files changed, 215 insertions(+), 144 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index c421c73..3a16353 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -162,7 +162,6 @@ public final class ExecConstants { public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null); - // Hash Aggregate Options public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions"; public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, @@ -189,6 +188,12 @@ public final class ExecConstants { public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY, new OptionDescription("Hash Aggregates ignore memory limits when enabled (true). When disabled (false), Hash Aggregates fail when memory is set too low.")); + // Partitioner options + public static final String PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY = "exec.partition.mem_throttle"; + public static final LongValidator PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR = + new RangeLongValidator(PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY, 0, Integer.MAX_VALUE, + new OptionDescription("Linearly reduces partition sender buffer row count after this number of receivers. Default is 0 (disabled). (Since Drill 1.18)")); + public static final String SSL_PROVIDER = "drill.exec.ssl.provider"; // valid values are "JDK", "OPENSSL" // default JDK public static final String SSL_PROTOCOL = "drill.exec.ssl.protocol"; // valid values are SSL, SSLV2, SSLV3, TLS, TLSV1, TLSv1.1, TLSv1.2(default) public static final String SSL_KEYSTORE_TYPE = "drill.exec.ssl.keyStoreType"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java index b4790c8..2c580da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java @@ -29,6 +29,8 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.server.options.OptionSet; import org.codehaus.commons.compiler.CompileException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements the "plain Java" method of code generation and @@ -76,10 +78,9 @@ import org.codehaus.commons.compiler.CompileException; * The setting to prefer plain Java is ignored for any remaining generated * classes not marked as plain Java capable. */ - public class ClassBuilder { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassBuilder.class); + private static final Logger logger = LoggerFactory.getLogger(ClassBuilder.class); public static final String CODE_DIR_OPTION = CodeCompiler.COMPILE_BASE + ".code_dir"; private final DrillConfig config; @@ -95,7 +96,6 @@ public class ClassBuilder { // point your debugger to the directory set below, and you // can step into the code for debugging. Code is not saved // be default because doing so is expensive and unnecessary. - codeDir = new File(config.getString(CODE_DIR_OPTION)); } @@ -109,7 +109,6 @@ public class ClassBuilder { * @return the class that the code generator defines * @throws ClassTransformationException */ - public Class<?> getImplementationClass(CodeGenerator<?> cg) throws ClassTransformationException { try { return compileClass(cg); @@ -133,23 +132,19 @@ public class ClassBuilder { final long t1 = System.nanoTime(); // Get the plain Java code. - String code = cg.getGeneratedCode(); // Get the class names (dotted, file path, etc.) - String className = cg.getMaterializedClassName(); ClassTransformer.ClassNames name = new ClassTransformer.ClassNames(className); // A key advantage of this method is that the code can be // saved and debugged, if needed. - if (cg.isCodeToBeSaved()) { saveCode(code, name); } // Compile the code and load it into a class loader. - CachedClassLoader classLoader = new CachedClassLoader(); ClassCompilerSelector compilerSelector = new ClassCompilerSelector(classLoader, config, options); Map<String,byte[]> results = compilerSelector.compile(name, code); @@ -165,7 +160,6 @@ public class ClassBuilder { (System.nanoTime() - t1 + 500_000) / 1_000_000); // Get the class from the class loader. - try { return classLoader.findClass(className); } catch (ClassNotFoundException e) { @@ -184,7 +178,6 @@ public class ClassBuilder { * @param code the source code * @param name the class name */ - private void saveCode(String code, ClassNames name) { String pathName = name.slash + ".java"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 843d20b..00d5bae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -945,7 +945,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> // everything in memory String message = String.format( "When using the minimum number of partitions %d we require %s memory but only have %s available. " - + "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.", + + "Forcing legacy behavior of using unbounded memory in order to prevent regressions.", numPartitions, FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()), FileUtils.byteCountToDisplaySize(allocator.getLimit())); @@ -984,7 +984,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> // is enabled if (reason == null) { boolean fallbackEnabled = context.getOptions() - .getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val; + .getBoolean(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY); if (fallbackEnabled) { logger.warn( "Spilling is disabled - not enough memory available for internal partitioning. Falling back" @@ -992,8 +992,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> } else { throw UserException.resourceError().message(String.format( "Not enough memory for internal partitioning and fallback mechanism for " - + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " - + "session/system command or increase memory limit for Drillbit", + + "HashJoin to use unbounded memory is disabled.\n" + + "Either enable fallback option %s using ALTER " + + "SESSION/SYSTEM command or increase the memory limit for the Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger); } } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 76c60e8..4f22c5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -48,8 +48,9 @@ public interface Partitioner { void initialize(); void clear(); List<? extends PartitionOutgoingBatch> getOutgoingBatches(); + /** - * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner + * Get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner * @param index * @return PartitionOutgoingBatch that matches index within Partitioner. This method can * return null if index does not fall within boundary of this Partitioner @@ -58,4 +59,4 @@ public interface Partitioner { OperatorStats getStats(); TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index a3149b8..95e62e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -24,10 +24,11 @@ import java.util.List; import javax.inject.Named; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.ExchangeFragmentContext; @@ -50,7 +51,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,9 +104,44 @@ public abstract class PartitionerTemplate implements Partitioner { this.end = end; doSetup(context, incoming, null); - // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory - // allocated. - if (popConfig.getDestinations().size() > 1000) { + // Consider the system/session option to allow the buffer size to shrink + // linearly with the increase in slice count, over some limit: + // exec.partition.mem_throttle: + // The default is 0, which leaves the current logic unchanged. + // If set to a positive value, then when the slice count exceeds that + // amount, the buffer size per sender is reduced. + // The reduction factor is 1 / (slice count - threshold), with a minimum + // batch size of 256 records. + // + // So, if we set the threshold at 2, and run 10 slices, each slice will + // get 1024 / 8 = 256 records. + // + // This option controls memory, but at an obvious cost of increasing overhead. + // One could argue that this is a good thing. As the number of senders + // increases, the number of records going to each sender decreases, which + // increases the time that batches must accumulate before they are sent. + // + // If the option is enabled, and buffer size reduction kicks in, you'll + // find an info-level log message which details the reduction: + // exec.partition.mem_throttle is set to 2: 10 receivers, + // reduced send buffer size from 1024 to 256 rows + // + // See DRILL-7675, DRILL-7686. + int destinationCount = popConfig.getDestinations().size(); + int reductionCutoff = oContext.getFragmentContext().getOptions().getInt( + ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY); + if (reductionCutoff > 0 && destinationCount >= reductionCutoff) { + int reducedBatchSize = Math.max(256, + (DEFAULT_RECORD_BATCH_SIZE + 1) / (destinationCount - reductionCutoff)); + outgoingRecordBatchSize = BaseAllocator.nextPowerOfTwo(reducedBatchSize) - 1; + logger.info("{} is set to {}: {} receivers, reduced send buffer size from {} to {} rows", + ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY, + reductionCutoff, destinationCount, + DEFAULT_RECORD_BATCH_SIZE, outgoingRecordBatchSize); + } else if (destinationCount > 1000) { + // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory + // allocated. + // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1; } @@ -114,7 +149,7 @@ public abstract class PartitionerTemplate implements Partitioner { int fieldId = 0; for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { // create outgoingBatches only for subset of Destination Points - if ( fieldId >= start && fieldId < end ) { + if (fieldId >= start && fieldId < end) { logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId); outgoingBatches.add(newOutgoingRecordBatch(stats, popConfig, context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId())); @@ -149,7 +184,6 @@ public abstract class PartitionerTemplate implements Partitioner { * generated inner class. Byte-code manipulation appears to fix up the byte codes * directly. The name is special, it must be "new" + inner class name. */ - protected OutgoingRecordBatch newOutgoingRecordBatch( OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { @@ -259,24 +293,23 @@ public abstract class PartitionerTemplate implements Partitioner { private final AccountingDataTunnel tunnel; private final HashPartitionSender operator; private final FragmentContext context; - private final BufferAllocator allocator; - private final VectorContainer vectorContainer = new VectorContainer(); + private final VectorContainer vectorContainer; private final int oppositeMinorFragmentId; private final OperatorStats stats; - private boolean isLast = false; - private boolean dropAll = false; + private boolean isLast; + private boolean dropAll; private int recordCount; private int totalRecords; public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { this.context = context; - this.allocator = allocator; this.operator = operator; this.tunnel = tunnel; this.stats = stats; this.oppositeMinorFragmentId = oppositeMinorFragmentId; + this.vectorContainer = new VectorContainer(allocator); } protected void copy(int inIndex) throws IOException { @@ -376,9 +409,7 @@ public abstract class PartitionerTemplate implements Partitioner { } private void allocateOutgoingRecordBatch() { - for (VectorWrapper<?> v : vectorContainer) { - v.getValueVector().allocateNew(); - } + vectorContainer.allocate(outgoingRecordBatchSize); } public void updateStats(FragmentWritableBatch writableBatch) { @@ -391,12 +422,7 @@ public abstract class PartitionerTemplate implements Partitioner { * Initialize the OutgoingBatch based on the current schema in incoming RecordBatch */ public void initializeBatch() { - for (VectorWrapper<?> v : incoming) { - // create new vector - ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); - outgoingVector.setInitialCapacity(outgoingRecordBatchSize); - vectorContainer.add(outgoingVector); - } + vectorContainer.buildFrom(incoming.getSchema()); allocateOutgoingRecordBatch(); try { doSetup(incoming, vectorContainer); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 61850b7..6d8865d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -123,7 +123,6 @@ public class BatchSchema implements Iterable<MaterializedField> { * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check. */ - @Override public boolean equals(Object obj) { if (this == obj) { @@ -145,7 +144,6 @@ public class BatchSchema implements Iterable<MaterializedField> { // Compare names. // (DRILL-5525: actually compares all fields.) - if (!fields.equals(other.fields)) { return false; } @@ -153,7 +151,6 @@ public class BatchSchema implements Iterable<MaterializedField> { // Compare types // (DRILL-5525: this code is redundant because any differences // will fail above.) - for (int i = 0; i < fields.size(); i++) { MajorType t1 = fields.get(i).getType(); MajorType t2 = other.fields.get(i).getType(); @@ -181,7 +178,6 @@ public class BatchSchema implements Iterable<MaterializedField> { * the {@link MaterializedField#isEquivalent(MaterializedField)} rules, * false otherwise */ - public boolean isEquivalent(BatchSchema other) { if (this == other) { return true; @@ -209,17 +205,15 @@ public class BatchSchema implements Iterable<MaterializedField> { private boolean majorTypeEqual(MajorType t1, MajorType t2) { if (t1.equals(t2)) { return true; - } - if (!t1.getMinorType().equals(t2.getMinorType())) { + } else if (!t1.getMinorType().equals(t2.getMinorType())) { return false; - } - if (!t1.getMode().equals(t2.getMode())) { + } else if (!t1.getMode().equals(t2.getMode())) { return false; - } - if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) { + } else if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) { return false; + } else { + return true; } - return true; } /** @@ -237,7 +231,6 @@ public class BatchSchema implements Iterable<MaterializedField> { * @param otherSchema the schema to merge with this one * @return the new, merged, schema */ - public BatchSchema merge(BatchSchema otherSchema) { if (selectionVectorMode != SelectionVectorMode.NONE || otherSchema.selectionVectorMode != SelectionVectorMode.NONE) { @@ -249,4 +242,17 @@ public class BatchSchema implements Iterable<MaterializedField> { mergedFields.addAll(otherSchema.fields); return new BatchSchema(selectionVectorMode, mergedFields); } + + /** + * Format the schema into a multi-line format. Useful when debugging a query with + * a very wide schema as the usual single-line format is far too hard to read. + */ + public String format() { + StringBuilder buf = new StringBuilder(); + buf.append("Batch Schema:\n"); + for (MaterializedField field : fields) { + field.format(buf, 1); + } + return buf.toString(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 3796e5a..4ec0b8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -440,6 +440,14 @@ public class VectorContainer implements VectorAccessible { return wrappers.size(); } + public void allocate(int recordCount) { + for (VectorWrapper<?> w : wrappers) { + ValueVector v = w.getValueVector(); + v.setInitialCapacity(recordCount); + v.allocateNew(); + } + } + public void allocateNew() { for (VectorWrapper<?> w : wrappers) { w.getValueVector().allocateNew(); @@ -559,4 +567,10 @@ public class VectorContainer implements VectorAccessible { addOrGet(wrapper.getField()); } } + + public void buildFrom(BatchSchema sourceSchema) { + for (MaterializedField field : sourceSchema) { + add(TypeHelper.getNewVector(field, allocator)); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java index a502e07..ee786c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java @@ -69,6 +69,11 @@ public abstract class BaseOptionManager implements OptionManager { } @Override + public int getInt(String name) { + return (int) getLong(name); + } + + @Override public long getLong(String name) { return getByType(name, Kind.LONG).num_val; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java index e96d571..7e85a76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java @@ -83,18 +83,26 @@ public interface OptionSet { * @throws IllegalArgumentException if the option is undefined or * is not of the correct data type */ - boolean getBoolean(String name); /** - * Return the value of a long option. + * Return the value of a long option as an int * * @param name option name * @return the long value * @throws IllegalArgumentException if the option is undefined or * is not of the correct data type */ + int getInt(String name); + /** + * Return the value of a long option. + * + * @param name option name + * @return the long value + * @throws IllegalArgumentException if the option is undefined or + * is not of the correct data type + */ long getLong(String name); /** @@ -105,7 +113,6 @@ public interface OptionSet { * @throws IllegalArgumentException if the option is undefined or * is not of the correct data type */ - double getDouble(String name); /** @@ -116,6 +123,5 @@ public interface OptionSet { * @throws IllegalArgumentException if the option is undefined or * is not of the correct data type */ - String getString(String name); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 5d2598c..3eb643c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -188,6 +188,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), + new OptionDefinition(ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR), new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR), new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR), new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java index 28a98aa..f24ea09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java @@ -28,15 +28,18 @@ import com.sun.codemodel.JExpression; import com.sun.codemodel.JVar; public class CopyUtil { - public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper){ + + public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper) { // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all. int fieldId = 0; JExpression inIndex = JExpr.direct("inIndex"); JExpression outIndex = JExpr.direct("outIndex"); - for(VectorWrapper<?> vv : batch) { + for (VectorWrapper<?> vv : batch) { String copyMethod; - if (!Types.isFixedWidthType(vv.getField().getType()) || Types.isRepeated(vv.getField().getType()) || Types.isComplex(vv.getField().getType())) { + if (!Types.isFixedWidthType(vv.getField().getType()) || + Types.isRepeated(vv.getField().getType()) || + Types.isComplex(vv.getField().getType())) { copyMethod = "copyFromSafe"; } else { copyMethod = "copyFrom"; @@ -53,19 +56,13 @@ public class CopyUtil { .build(); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", outFieldId); - if(hyper){ - - g.getEvalBlock().add( - outVV + if (hyper) { + g.getEvalBlock().add(outVV .invoke(copyMethod) - .arg( - inIndex.band(JExpr.lit((int) Character.MAX_VALUE))) + .arg(inIndex.band(JExpr.lit((int) Character.MAX_VALUE))) .arg(outIndex) - .arg( - inVV.component(inIndex.shrz(JExpr.lit(16))) - ) - ); - }else{ + .arg(inVV.component(inIndex.shrz(JExpr.lit(16))))); + } else { g.getEvalBlock().add(outVV.invoke(copyMethod).arg(inIndex).arg(outIndex).arg(inVV)); } @@ -73,5 +70,4 @@ public class CopyUtil { fieldId++; } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 962d74e..1cecbd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -55,7 +55,7 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.FailureUtils; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.Pointer; @@ -65,6 +65,8 @@ import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Date; @@ -93,8 +95,8 @@ import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM; */ public class Foreman implements Runnable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); - private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger"); + private static final Logger logger = LoggerFactory.getLogger(Foreman.class); + private static final Logger queryLogger = LoggerFactory.getLogger("query.logger"); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class); public enum ProfileOption { SYNC, ASYNC, NONE } @@ -108,8 +110,7 @@ public class Foreman implements Runnable { private final QueryManager queryManager; // handles lower-level details of query execution private final DrillbitContext drillbitContext; private final UserClientConnection initiatingClient; // used to send responses - private boolean resume = false; - private final ProfileOption profileOption; + private boolean resume; private final QueryResourceManager queryRM; @@ -122,7 +123,7 @@ public class Foreman implements Runnable { private String queryText; private RuntimeFilterRouter runtimeFilterRouter; - private boolean enableRuntimeFilter; + private final boolean enableRuntimeFilter; /** * Constructor. Sets up the Foreman, but does not initiate any execution. @@ -154,11 +155,9 @@ public class Foreman implements Runnable { this.queryRM = drillbitContext.getResourceManager().newQueryRM(this); this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this); this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult()); - this.profileOption = setProfileOption(queryContext.getOptions()); this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val; } - /** * @return query id */ @@ -326,11 +325,15 @@ public class Foreman implements Runnable { } } - private ProfileOption setProfileOption(OptionManager options) { - if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) { + private ProfileOption getProfileOption(QueryContext queryContext) { + if (queryContext.isSkipProfileWrite()) { + return ProfileOption.NONE; + } + OptionSet options = queryContext.getOptions(); + if (!options.getBoolean(ExecConstants.ENABLE_QUERY_PROFILE_OPTION)) { return ProfileOption.NONE; } - if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) { + if (options.getBoolean(ExecConstants.QUERY_PROFILE_DEBUG_OPTION)) { return ProfileOption.SYNC; } else { return ProfileOption.ASYNC; @@ -792,8 +795,8 @@ public class Foreman implements Runnable { // Debug option: write query profile before sending final results so that // the client can be certain the profile exists. - final boolean skipProfileWrite = queryContext.isSkipProfileWrite(); - if (profileOption == ProfileOption.SYNC && !skipProfileWrite) { + ProfileOption profileOption = getProfileOption(queryContext); + if (profileOption == ProfileOption.SYNC) { queryManager.writeFinalProfile(uex); } @@ -823,7 +826,7 @@ public class Foreman implements Runnable { // storage write; query completion occurs in parallel with profile // persistence. - if (profileOption == ProfileOption.ASYNC && !skipProfileWrite) { + if (profileOption == ProfileOption.ASYNC) { queryManager.writeFinalProfile(uex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 0c140a4..66739de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -58,12 +58,15 @@ import com.carrotsearch.hppc.predicates.IntObjectPredicate; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments. + * Each Foreman holds its own QueryManager. This manages the events associated + * with execution of a particular query across all fragments. */ public class QueryManager implements AutoCloseable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class); + private static final Logger logger = LoggerFactory.getLogger(QueryManager.class); private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap(); private final QueryId queryId; @@ -240,10 +243,10 @@ public class QueryManager implements AutoCloseable { public void close() throws Exception { } /* - * This assumes that the FragmentStatusListener implementation takes action when it hears - * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything - * but log messages. - */ + * This assumes that the FragmentStatusListener implementation takes action when it hears + * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything + * but log messages. + */ private static class SignalListener extends EndpointListener<Ack, FragmentHandle> { /** * An enum of possible signals that {@link SignalListener} listens to. @@ -461,13 +464,10 @@ public class QueryManager implements AutoCloseable { * there is a node failure, we can then correctly track how many outstanding messages will never arrive. */ private class NodeTracker { - private final DrillbitEndpoint endpoint; private final AtomicInteger totalFragments = new AtomicInteger(0); private final AtomicInteger completedFragments = new AtomicInteger(0); - public NodeTracker(final DrillbitEndpoint endpoint) { - this.endpoint = endpoint; - } + public NodeTracker(final DrillbitEndpoint endpoint) { } /** * Increments the number of fragment this node is running. @@ -506,7 +506,6 @@ public class QueryManager implements AutoCloseable { } return true; } - } /** @@ -556,7 +555,6 @@ public class QueryManager implements AutoCloseable { } }; - public DrillbitStatusListener getDrillbitStatusListener() { return drillbitStatusListener; } @@ -601,7 +599,6 @@ public class QueryManager implements AutoCloseable { new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].", failedNodeList))); } - } }; } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 9e306e6..46ae53a 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -541,6 +541,7 @@ drill.exec.options: { exec.java_compiler_janino_maxsize: 262144, exec.max_hash_table_size: 1073741824, exec.min_hash_table_size: 65536, + exec.partition.mem_throttle: 0, exec.persistent_table.umask: "002", exec.query.progress.update: true, exec.query_profile.debug_mode: false, diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index 80e2a26..c8b64a4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -80,7 +80,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { { // Properties here mimic those in drill-root/pom.xml, Surefire plugin // configuration. They allow tests to run successfully in Eclipse. - put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false); // The CTTAS function requires that the default temporary workspace be @@ -88,36 +87,30 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work // around this, tests are supposed to use dfs. So, we need to // set the default temporary workspace to dfs.tmp. - put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA); put(ExecConstants.HTTP_ENABLE, false); put("drill.catastrophic_to_standard_out", true); // Verbose errors. - put(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, true); // See Drillbit.close. The Drillbit normally waits a specified amount // of time for ZK registration to drop. But, embedded Drillbits normally // don't use ZK, so no need to wait. - put(ExecConstants.ZK_REFRESH, 0); // This is just a test, no need to be heavy-duty on threads. // This is the number of server and client RPC threads. The // production default is DEFAULT_SERVER_RPC_THREADS. - put(ExecConstants.BIT_SERVER_RPC_THREADS, 2); // No need for many scanners except when explicitly testing that // behavior. Production default is DEFAULT_SCAN_THREADS - put(ExecConstants.SCAN_THREADPOOL_SIZE, 4); // Define a useful root location for the ZK persistent // storage. Profiles will go here when running in distributed // mode. - put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT, "/tmp/drill/tests"); } @@ -147,9 +140,9 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { startDrillbits(); applyOptions(); } catch (Exception e) { + // Translate exceptions to unchecked to avoid cluttering // tests. Failures will simply fail the test itself. - throw new IllegalStateException("Cluster fixture setup failed", e); } } @@ -158,7 +151,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * Set the client properties to be used by client fixture. */ private void setClientProps() { - clientProps = builder.clientProps; + clientProps = builder.clientProps; } public Properties getClientProps() { @@ -166,17 +159,17 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } private void configureZk() { - // Start ZK if requested. + // Start ZK if requested. String zkConnect; if (builder.zkHelper != null) { - // Case where the test itself started ZK and we're only using it. + // Case where the test itself started ZK and we're only using it. zkHelper = builder.zkHelper; ownsZK = false; } else if (builder.localZkCount > 0) { - // Case where we need a local ZK just for this test cluster. + // Case where we need a local ZK just for this test cluster. zkHelper = new ZookeeperHelper(); zkHelper.startZookeeper(builder.localZkCount); ownsZK = true; @@ -189,7 +182,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // in config properties defined at run time. Drill does not allow // combining locally-set properties and a config file: it is one // or the other. - if (builder.configBuilder().hasResource()) { throw new IllegalArgumentException("Cannot specify a local ZK while using an external config file."); } @@ -202,27 +194,27 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } private void createConfig() throws Exception { + // Create a config // Because of the way DrillConfig works, we can set the ZK // connection string only if a property set is provided. - config = builder.configBuilder.build(); if (builder.usingZk) { - // Distribute drillbit using ZK (in-process or external) + // Distribute drillbit using ZK (in-process or external) serviceSet = null; usesZk = true; } else { - // Embedded Drillbit. + // Embedded Drillbit. serviceSet = RemoteServiceSet.getLocalServiceSet(); } } private void startDrillbits() throws Exception { - // Start the Drillbits. + // Start the Drillbits. Preconditions.checkArgument(builder.bitCount > 0); int bitCount = builder.bitCount; for (int i = 0; i < bitCount; i++) { @@ -230,7 +222,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { bit.run(); // Bit name and registration. - String name; if (builder.bitNames != null && i < builder.bitNames.length) { name = builder.bitNames[i]; @@ -239,7 +230,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // Name the Drillbit by default. Most tests use one Drillbit, // so make the name simple: "drillbit." Only add a numeric suffix // when the test creates multiple bits. - if (bitCount == 1) { name = DEFAULT_BIT_NAME; } else { @@ -250,7 +240,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { // Remember the first Drillbit, this is the default one returned from // drillbit(). - if (i == 0) { defaultDrillbit = bit; } @@ -330,7 +319,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * @return a test client. Client will be closed when this cluster * fixture closes, or can be closed early */ - public ClientFixture client(String host, int port) { return clientBuilder() .property(DrillProperties.DRILLBIT_CONNECTION, String.format("%s:%d", host, port)) @@ -573,6 +561,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getHomeDir().getAbsolutePath()); props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath()); props.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS); + // ALTER SESSION profiles are seldom interesting + props.setProperty(ExecConstants.SKIP_ALTER_SESSION_QUERY_PROFILE, Boolean.TRUE.toString()); builder.configBuilder.configProps(props); return builder; @@ -640,7 +630,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { * @param value the value to encode * @return the SQL-acceptable string equivalent */ - public static String stringify(Object value) { if (value == null) { return null; @@ -654,8 +643,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } public static String getResource(String resource) throws IOException { - // Unlike the Java routines, Guava does not like a leading slash. + // Unlike the Java routines, Guava does not like a leading slash. final URL url = Resources.getResource(trimSlash(resource)); if (url == null) { throw new IOException( diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java index 24beb3a..9bc5312 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java @@ -214,7 +214,6 @@ public class ClusterFixtureBuilder { usingZk = true; // Using ZK. Turn refresh wait back on. - return configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH); } @@ -236,7 +235,6 @@ public class ClusterFixtureBuilder { usingZk = true; // Using ZK. Turn refresh wait back on. - configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH); return this; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java index e1b5d99..40fe40e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Objects; +import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -393,27 +394,9 @@ public class MaterializedField { */ public String toString(boolean includeChildren) { final int maxLen = 10; - final StringBuilder builder = new StringBuilder(); - builder - .append("[`") - .append(name) - .append("` (") - .append(type.getMinorType().name()); - - if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) { - builder.append("("); - builder.append(type.getPrecision()); - if (type.hasScale() && type.getScale() > 0) { - builder.append(", "); - builder.append(type.getScale()); - } - builder.append(")"); - } - - builder - .append(":") - .append(type.getMode().name()) - .append(")"); + final StringBuilder builder = new StringBuilder() + .append("[`"); + prefix(builder); if (includeChildren) { if (type.getSubTypeCount() > 0) { @@ -436,17 +419,63 @@ public class MaterializedField { .toString(); } + private void prefix(StringBuilder builder) { + builder + .append(name) + .append("` (") + .append(type.getMinorType().name()); + + if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) { + builder.append("("); + builder.append(type.getPrecision()); + if (type.hasScale() && type.getScale() > 0) { + builder.append(", "); + builder.append(type.getScale()); + } + builder.append(")"); + } + + builder + .append(":") + .append(type.getMode().name()) + .append(")"); + } + @Override public String toString() { return toString(true); } + public String format() { + final StringBuilder builder = new StringBuilder(); + format(builder, 0); + return builder.toString(); + } + + /** + * Format the field in a multi-line format, with children (but not subtypes) + * indented. Useful for wide rows where the single-line format is too hard + * to read. + */ + public void format(StringBuilder builder, int level) { + builder.append(StringUtils.repeat(' ', level)); + prefix(builder); + if (children != null && ! children.isEmpty()) { + builder.append(":\n"); + for (MaterializedField child : children) { + child.format(builder, level + 1); + } + } else { + builder.append("\n"); + } + } + /** * Return true if two fields have identical MinorType and Mode. */ public boolean hasSameTypeAndMode(MaterializedField that) { - return (getType().getMinorType() == that.getType().getMinorType()) - && (getType().getMode() == that.getType().getMode()); + return getType().getMinorType() == that.getType().getMinorType() + && getType().getMode() == that.getType().getMode(); } private String toString(Collection<?> collection, int maxLen) {