http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index 5131772..cb62b27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -62,7 +62,6 @@ import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.expr.ClassGenerator.BlockType; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.fn.AbstractFuncHolder; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.vector.ValueHolderHelper; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -85,12 +84,7 @@ import com.sun.codemodel.JVar; public class EvaluationVisitor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EvaluationVisitor.class); - - private final FunctionImplementationRegistry registry; - - public EvaluationVisitor(FunctionImplementationRegistry registry) { - super(); - this.registry = registry; + public EvaluationVisitor() { } public HoldingContainer addExpr(LogicalExpression e, ClassGenerator<?> generator) { @@ -497,7 +491,6 @@ public class EvaluationVisitor { } if (complex || repeated) { - MajorType finalType = e.getFieldId().getFinalType(); // // JVar complexReader = generator.declareClassField("reader", generator.getModel()._ref(FieldReader.class));
http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java index b83350d..6c186db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.expr.fn; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import com.google.common.collect.Lists; @@ -26,7 +25,6 @@ import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FunctionCall; -import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.IfExpression; import org.apache.drill.common.expression.IfExpression.IfCondition; import org.apache.drill.common.expression.LogicalExpression; @@ -34,11 +32,9 @@ import org.apache.drill.common.expression.ValueExpressions.IntExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.HoldingContainerExpression; -import org.apache.calcite.rel.RelFieldCollation.NullDirection; public class FunctionGenerationHelper { public static final String COMPARE_TO_NULLS_HIGH = "compare_to_nulls_high"; @@ -58,7 +54,7 @@ public class FunctionGenerationHelper { * {@code false}) or the highest value (if {@code true}) * @param left ... * @param right ... - * @param registry ... + * @param functionLookupContext ... * @return * FunctionHolderExpression containing the found function implementation */ @@ -66,7 +62,7 @@ public class FunctionGenerationHelper { boolean null_high, HoldingContainer left, HoldingContainer right, - FunctionImplementationRegistry registry) { + FunctionLookupContext functionLookupContext) { final String comparator_name = null_high ? COMPARE_TO_NULLS_HIGH : COMPARE_TO_NULLS_LOW; @@ -76,15 +72,14 @@ public class FunctionGenerationHelper { throw new UnsupportedOperationException( formatCanNotCompareMsg(left.getMajorType(), right.getMajorType())); } - LogicalExpression comparisonFunctionExpression = getFunctionExpression(comparator_name, Types.required(MinorType.INT), - registry, left, right); + LogicalExpression comparisonFunctionExpression = getFunctionExpression(comparator_name, left, right); ErrorCollector collector = new ErrorCollectorImpl(); if (!isUnionType(left.getMajorType()) && !isUnionType(right.getMajorType())) { - return ExpressionTreeMaterializer.materialize(comparisonFunctionExpression, null, collector, registry); + return ExpressionTreeMaterializer.materialize(comparisonFunctionExpression, null, collector, functionLookupContext); } else { LogicalExpression typeComparisonFunctionExpression = getTypeComparisonFunction(comparisonFunctionExpression, left, right); - return ExpressionTreeMaterializer.materialize(typeComparisonFunctionExpression, null, collector, registry); + return ExpressionTreeMaterializer.materialize(typeComparisonFunctionExpression, null, collector, functionLookupContext); } } @@ -107,8 +102,7 @@ public class FunctionGenerationHelper { return getOrderingComparator(true, left, right, registry); } - private static LogicalExpression getFunctionExpression( - String name, MajorType returnType, FunctionImplementationRegistry registry, HoldingContainer... args) { + private static LogicalExpression getFunctionExpression(String name, HoldingContainer... args) { List<MajorType> argTypes = new ArrayList<MajorType>(args.length); List<LogicalExpression> argExpressions = new ArrayList<LogicalExpression>(args.length); for(HoldingContainer c : args) { http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java index 9ca6dbd..20b1d12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionInitializer.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.expr.fn; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.StringReader; @@ -24,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.util.FileUtils; +import org.apache.drill.common.util.DrillFileUtils; import org.codehaus.commons.compiler.CompileException; import org.codehaus.janino.Java.CompilationUnit; import org.codehaus.janino.Parser; @@ -128,7 +129,7 @@ public class FunctionInitializer { private CompilationUnit convertToCompilationUnit(Class<?> clazz) throws IOException { String path = clazz.getName(); path = path.replaceFirst("\\$.*", ""); - path = path.replace(".", FileUtils.separator); + path = path.replace(".", DrillFileUtils.SEPARATOR); path = "/" + path + ".java"; logger.trace("Loading function code from the {}", path); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java index c583664..9d9020a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java @@ -20,22 +20,58 @@ package org.apache.drill.exec.physical.impl.TopN; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface PriorityQueue { - public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException; - public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; - public void generate() throws SchemaChangeException; - public VectorContainer getHyperBatch(); - public SelectionVector4 getHeapSv4(); - public SelectionVector4 getFinalSv4(); - public boolean validate(); - public void resetQueue(VectorContainer container, SelectionVector4 vector4) throws SchemaChangeException; - public void cleanup(); - - public static TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueue>(PriorityQueue.class, PriorityQueueTemplate.class); + /** + * The elements in the given batch are added to the priority queue. Note that the priority queue + * only retains the top elements that fit within the size specified by the {@link #init(int, BufferAllocator, boolean)} + * method. + * @param batch The batch containing elements we want to add. + * @throws SchemaChangeException + */ + void add(RecordBatchData batch) throws SchemaChangeException; + /** + * Initializes the priority queue. This method must be called before any other methods on the priority + * queue are called. + * @param limit The size of the priority queue. + * @param allocator The {@link BufferAllocator} to use when creating the priority queue. + * @param hasSv2 True when incoming batches have 2 byte selection vectors. False otherwise. + * @throws SchemaChangeException + */ + void init(int limit, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; + + /** + * This method must be called before fetching the final priority queue hyper batch and final Sv4 vector. + * @throws SchemaChangeException + */ + void generate() throws SchemaChangeException; + + /** + * Retrieves the final priority queue HyperBatch containing the results. <b>Note:</b> this should be called + * after {@link #generate()}. + * @return The final priority queue HyperBatch containing the results. + */ + VectorContainer getHyperBatch(); + + SelectionVector4 getSv4(); + + /** + * Retrieves the selection vector used to select the elements in the priority queue from the hyper batch + * provided by the {@link #getHyperBatch()} method. <b>Note:</b> this should be called after {@link #generate()}. + * @return The selection vector used to select the elements in the priority queue. + */ + SelectionVector4 getFinalSv4(); + + void resetQueue(VectorContainer container, SelectionVector4 vector4) throws SchemaChangeException; + + /** + * Releases all the memory consumed by the priority queue. + */ + void cleanup(); + + TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(PriorityQueue.class, PriorityQueueTemplate.class); } http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index b82dfc8..7f9aca4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -44,7 +44,6 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { private SelectionVector4 heapSv4; //This holds the heap private SelectionVector4 finalSv4; //This is for final sorted output private ExpandableHyperContainer hyperBatch; - private FragmentContext context; private BufferAllocator allocator; private int limit; private int queueSize = 0; @@ -52,9 +51,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { private boolean hasSv2; @Override - public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException { + public void init(int limit, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException { this.limit = limit; - this.context = context; this.allocator = allocator; @SuppressWarnings("resource") final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1)); @@ -63,9 +61,6 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { } @Override - public boolean validate() { return true; } - - @Override public void resetQueue(VectorContainer container, SelectionVector4 v4) throws SchemaChangeException { assert container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE; BatchSchema schema = container.getSchema(); @@ -89,12 +84,12 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { ++queueSize; } v4.clear(); - doSetup(context, hyperBatch, null); + doSetup(hyperBatch, null); } @SuppressWarnings("resource") @Override - public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException{ + public void add(RecordBatchData batch) throws SchemaChangeException{ Stopwatch watch = Stopwatch.createStarted(); if (hyperBatch == null) { hyperBatch = new ExpandableHyperContainer(batch.getContainer()); @@ -102,7 +97,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { hyperBatch.addBatch(batch.getContainer()); } - doSetup(context, hyperBatch, null); // may not need to do this every time + doSetup(hyperBatch, null); // may not need to do this every time int count = 0; SelectionVector2 sv2 = null; @@ -146,7 +141,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { } @Override - public SelectionVector4 getHeapSv4() { + public SelectionVector4 getSv4() { return heapSv4; } @@ -226,8 +221,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { return doEval(sv1, sv2); } - public abstract void doSetup(@Named("context") FragmentContext context, - @Named("incoming") VectorContainer incoming, + public abstract void doSetup(@Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract int doEval(@Named("leftIndex") int leftIndex, http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index dcf67d4..34c0f94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -18,18 +18,18 @@ package org.apache.drill.exec.physical.impl.TopN; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.drill.common.DrillAutoCloseables; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -40,6 +40,8 @@ import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.expr.fn.FunctionLookupContext; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; @@ -53,13 +55,13 @@ import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SchemaUtil; import org.apache.drill.exec.record.SimpleRecordBatch; -import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -67,17 +69,15 @@ import com.google.common.base.Stopwatch; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; -import static org.bouncycastle.asn1.x500.style.RFC4519Style.l; - public class TopNBatch extends AbstractRecordBatch<TopN> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class); - private final int batchPurgeThreshold; - - public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); - public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); - public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + public final MappingSet mainMapping = createMainMappingSet(); + public final MappingSet leftMapping = createLeftMappingSet(); + public final MappingSet rightMapping = createRightMappingSet(); + private final int batchPurgeThreshold; + private final boolean codegenDump; private final RecordBatch incoming; private BatchSchema schema; @@ -95,7 +95,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { super(popConfig, context); this.incoming = incoming; this.config = popConfig; - batchPurgeThreshold = context.getConfig().getInt(ExecConstants.BATCH_PURGE_THRESHOLD); + DrillConfig drillConfig = context.getConfig(); + batchPurgeThreshold = drillConfig.getInt(ExecConstants.BATCH_PURGE_THRESHOLD); + codegenDump = drillConfig.getBoolean(CodeCompiler.ENABLE_SAVE_CODE_FOR_DEBUG_TOPN); } @Override @@ -181,7 +183,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } } - try{ outer: while (true) { Stopwatch watch = Stopwatch.createStarted(); @@ -240,9 +241,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { try { if (priorityQueue == null) { assert !schemaChanged; - priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit()); } - priorityQueue.add(context, batch); + priorityQueue.add(batch); if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) { purge(); countSincePurge = 0; @@ -290,7 +291,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { VectorContainer c = priorityQueue.getHyperBatch(); VectorContainer newContainer = new VectorContainer(oContext); @SuppressWarnings("resource") - SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4(); + SelectionVector4 selectionVector4 = priorityQueue.getSv4(); SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); if (copier == null) { @@ -332,20 +333,42 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS)); } - public PriorityQueue createNewPriorityQueue(FragmentContext context, List<Ordering> orderings, - VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) - throws ClassTransformationException, IOException, SchemaChangeException{ - CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) + throws SchemaChangeException, ClassTransformationException, IOException { + return createNewPriorityQueue( + mainMapping, leftMapping, rightMapping, context.getOptionSet(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(), + config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode()); + } + + public static MappingSet createMainMappingSet() { + return new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + } + + public static MappingSet createLeftMappingSet() { + return new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + } + + public static MappingSet createRightMappingSet() { + return new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + } + + public static PriorityQueue createNewPriorityQueue( + MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping, + OptionSet optionSet, FunctionLookupContext functionLookupContext, CodeCompiler codeCompiler, + List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump, + int limit, BufferAllocator allocator, SelectionVectorMode mode) + throws ClassTransformationException, IOException, SchemaChangeException { + CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); + cg.saveCodeForDebugging(codegenDump); ClassGenerator<PriorityQueue> g = cg.getRoot(); g.setMappingSet(mainMapping); for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry(), unionTypeEnabled); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, functionLookupContext, unionTypeEnabled); if (collector.hasErrors()) { throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); } @@ -357,8 +380,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { // next we wrap the two comparison sides and add the expression block for the comparison. LogicalExpression fh = - FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, - context.getFunctionRegistry()); + FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, functionLookupContext); HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); @@ -373,8 +395,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { g.rotateBlock(); g.getEvalBlock()._return(JExpr.lit(0)); - PriorityQueue q = context.getImplementationClass(cg); - q.init(config.getLimit(), context, oContext.getAllocator(), schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE); + PriorityQueue q = codeCompiler.createInstance(cg); + q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE); return q; } @@ -390,7 +412,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { final VectorContainer c = priorityQueue.getHyperBatch(); final VectorContainer newContainer = new VectorContainer(oContext); @SuppressWarnings("resource") - final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4(); + final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); @@ -417,7 +439,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext); newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); priorityQueue.cleanup(); - priorityQueue = createNewPriorityQueue(context, config.getOrderings(), newSchemaContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + priorityQueue = createNewPriorityQueue(newSchemaContainer, config.getLimit()); priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent()); } finally { builder.clear(); @@ -436,7 +458,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { incoming.kill(sendUpstream); } - public static class SimpleSV4RecordBatch extends SimpleRecordBatch { private SelectionVector4 sv4; @@ -459,5 +480,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return sv4; } } - } http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 8006276..b3d68d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -223,8 +223,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException { - CodeGenerator<HashAggregator> top = - CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getOptions()); ClassGenerator<HashAggregator> cg = top.getRoot(); ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder"); top.plainJavaCapable(true); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index e5ba98f..b33dbd6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -273,11 +273,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ - ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, - context.getFunctionRegistry(), context.getOptions()); + ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index db9622f..dd4d76e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -135,7 +135,7 @@ public class ChainedHashTable { public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException, IOException, SchemaChangeException { - CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getOptions()); top.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // This code is called from generated code, so to step into this code, http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 6dfd311..1bdd097 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -144,7 +144,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected Filterer generateSV4Filterer() throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getOptions()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); if (collector.hasErrors()) { @@ -178,7 +178,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected Filterer generateSV2Filterer() throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry(), false, unionTypeEnabled); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index e64e919..2aa841b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -301,10 +301,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 481bea8..7e2859e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -49,7 +49,6 @@ import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; -import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.ExpandableHyperContainer; @@ -398,10 +397,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { - final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); + // cg.saveCodeForDebugging(true); final ClassGenerator<HashJoinProbe> g = cg.getRoot(); // Generate the code to project build side records http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index a1b8dc2..6d804c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -271,10 +271,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{ - final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); final ErrorCollector collector = new ErrorCollectorImpl(); // Generate members and initialization code http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index b390e41..fa8c13a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -43,7 +43,6 @@ import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; -import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.MaterializedField; @@ -221,10 +220,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi */ private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException, SchemaChangeException { final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get( - NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + NestedLoopJoin.TEMPLATE_DEFINITION, context.getOptions()); nLJCodeGenerator.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// nLJCodeGenerator.saveCodeForDebugging(true); + // nLJCodeGenerator.saveCodeForDebugging(true); final ClassGenerator<NestedLoopJoin> nLJClassGenerator = nLJCodeGenerator.getRoot(); // generate doEval http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index ec945d6..a7d3f39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -700,10 +700,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } -// private boolean isOutgoingFull() { -// return outgoingPosition == DEFAULT_ALLOC_RECORD_COUNT; -// } - /** * Creates a generate class which implements the copy and compare methods. * @@ -713,7 +709,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException { try { - final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // cg.saveCodeForDebugging(true); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 63133d4..7f662ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -420,13 +420,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); - final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, - context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions()); // Note: disabled for now. This may require some debugging: // no tests are available for this operator. -// cg.getCodeGenerator().plainOldJavaCapable(true); + // cg.getCodeGenerator().plainOldJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); int i = 0; for (Ordering od : orderings) { @@ -592,7 +591,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart final List<TransferPair> transfers = Lists.newArrayList(); final ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot( - OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getOptions()); // Note: disabled for now. This may require some debugging: // no tests are available for this operator. // cg.getCodeGenerator().plainOldJavaCapable(true); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 92364e8..7684e94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -265,10 +265,10 @@ public class PartitionSenderRootExec extends BaseRootExec { final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<Partitioner> cg ; - cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch"); final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 30efeec..3abf0fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -35,7 +35,6 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.fn.CastFunctions; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.ClassTransformationException; @@ -52,7 +51,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -61,9 +59,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; @@ -73,9 +68,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @@ -326,10 +319,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); @@ -492,7 +485,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { CodeGenerator<Projector> codeGen = cg.getCodeGenerator(); codeGen.plainJavaCapable(true); // Uncomment out this line to debug the generated code. - // codeGen.saveCodeForDebugging(true); + // codeGen.saveCodeForDebugging(true); this.projector = context.getImplementationClass(codeGen); projector.setup(context, incomingBatch, this, transfers); } catch (ClassTransformationException | IOException e) { http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 152cabb..d711592 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -161,13 +161,13 @@ public class SortBatch extends AbstractRecordBatch<Sort> { public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) throws ClassTransformationException, IOException, SchemaChangeException{ - CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getOptions()); // This operator may be deprecated. No tests exercise it. // There is no way, at present, to verify if the generated code // works with Plain-old Java. -// cg.plainOldJavaCapable(true); + // cg.plainOldJavaCapable(true); // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); + // cg.saveCodeForDebugging(true); ClassGenerator<Sorter> g = cg.getRoot(); g.setMappingSet(mainMapping); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index b875b66..4304c2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -149,21 +149,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect throw new IllegalStateException(e); } - /* - StringBuilder builder = new StringBuilder(); - for (VectorWrapper w : container) { - builder.append(w.getField().getPath()); - builder.append(" Value capacity: "); - builder.append(w.getValueVector().getValueCapacity()); - if (w.getValueVector() instanceof VariableWidthVector) { - builder.append(" Byte capacity: "); - builder.append(((VariableWidthVector) w.getValueVector()).getByteCapacity()); - builder.append("\n"); - } - } - logger.debug(builder.toString()); - */ - if (copiedRecords < remainingRecordCount) { for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); @@ -234,7 +219,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } try { - final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry(), context.getOptions()); + final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getOptions()); CopyUtil.generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, incoming, this); @@ -262,7 +247,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } try { - final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry(), context.getOptions()); + final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getOptions()); CopyUtil.generateCopies(cg.getRoot(), batch, true); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 4d623cf..761e272 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -165,7 +165,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { transfers.clear(); allocationVectors.clear(); - final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. // cg.getCodeGenerator().saveCodeForDebugging(true); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index 989ea96..f4a9825 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -40,7 +40,6 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; -import org.apache.drill.exec.physical.impl.project.Projector; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; @@ -333,7 +332,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { TemplateClassDefinition<WindowFramer> definition = useCustomFrame ? WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION; - final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getFunctionRegistry(), context.getOptions()); + final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getOptions()); { // generating framer.isSamePartition() http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index e0cfc7a..c212593 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -693,7 +693,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) throws ClassTransformationException, IOException, SchemaChangeException{ - CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions()); ClassGenerator<MSorter> g = cg.getRoot(); g.setMappingSet(mainMapping); @@ -736,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException{ - CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // This class can generate plain-old Java. // Uncomment out this line to debug the generated code. @@ -783,7 +783,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer, boolean spilling) throws SchemaChangeException { try { if (copier == null) { - CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // cg.saveCodeForDebugging(true); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java index f592e44..dee24dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java @@ -142,9 +142,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { } private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) { - CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, - context.getFragmentContext().getFunctionRegistry(), - context.getFragmentContext().getOptionSet()); + CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java index ab8cc9a..4d21b11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java @@ -80,9 +80,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper { private PriorityQueueCopier newCopier(VectorAccessible batch) { // Generate the copier code and obtain the resulting class - CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, - context.getFragmentContext().getFunctionRegistry(), - context.getFragmentContext().getOptionSet()); + CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); ClassGenerator<PriorityQueueCopier> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java index e7a78ed..1d43128 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java @@ -78,8 +78,7 @@ public class SorterWrapper extends BaseSortWrapper { private SingleBatchSorter newSorter(VectorAccessible batch) { CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get( - SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getFunctionRegistry(), - context.getFragmentContext().getOptionSet()); + SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getOptionSet()); ClassGenerator<SingleBatchSorter> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 564aaed..0497cfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.record; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -26,7 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.drill.common.types.TypeProtos.MajorType; - public class BatchSchema implements Iterable<MaterializedField> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class); @@ -74,7 +72,7 @@ public class BatchSchema implements Iterable<MaterializedField> { return "BatchSchema [fields=" + fields + ", selectionVector=" + selectionVectorMode + "]"; } - public static enum SelectionVectorMode { + public enum SelectionVectorMode { NONE(-1, false), TWO_BYTE(2, true), FOUR_BYTE(4, true); public boolean hasSelectionVector; http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index 3b8dd0d..bd077fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -116,6 +116,14 @@ public class SelectionVector4 implements AutoCloseable { } } + public static int getBatchIndex(int sv4Index) { + return (sv4Index >> 16) & 0xFFFF; + } + + public static int getRecordIndex(int sv4Index) { + return (sv4Index) & 0xFFFF; + } + @Override public void close() { clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index cb3bfd1..4cb1b45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -65,6 +65,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin { public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{ this.config = config; this.lpPersistance = context.getLpPersistence(); + try { fsConf = new Configuration(); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 6f747ea..73cb616 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -49,7 +49,7 @@ import com.google.common.collect.Maps; public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { private static final boolean IS_COMPRESSIBLE = true; - private static final String DEFAULT_NAME = "json"; + public static final String DEFAULT_NAME = "json"; public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { this(name, context, fsConf, storageConfig, new JSONFormatConfig()); http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java new file mode 100644 index 0000000..689a5bf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import java.io.File; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.WorkspaceConfig; + +import org.apache.drill.exec.store.easy.sequencefile.SequenceFileFormatConfig; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin; + +/** + * This class contains utility methods to speed up tests. Some of the production code currently calls this method + * when the production code is executed as part of the test runs. That's the reason why this code has to be in + * production module. + */ +public class StoragePluginTestUtils { + public static final String CP_PLUGIN_NAME = "cp"; + public static final String DFS_PLUGIN_NAME = "dfs"; + + public static final String TMP_SCHEMA = "tmp"; + public static final String DEFAULT_SCHEMA = "default"; + public static final String ROOT_SCHEMA = "root"; + + public static final String DFS_TMP_SCHEMA = DFS_PLUGIN_NAME + "." + TMP_SCHEMA; + public static final String DFS_DEFAULT_SCHEMA = DFS_PLUGIN_NAME + "." + DEFAULT_SCHEMA; + public static final String DFS_ROOT_SCHEMA = DFS_PLUGIN_NAME + "." + ROOT_SCHEMA; + + public static final String UNIT_TEST_PROP_PREFIX = "drillJDBCUnitTests"; + public static final String UNIT_TEST_DFS_TMP_PROP = UNIT_TEST_PROP_PREFIX + "." + DFS_TMP_SCHEMA; + public static final String UNIT_TEST_DFS_DEFAULT_PROP = UNIT_TEST_PROP_PREFIX + "." + DFS_DEFAULT_SCHEMA; + public static final String UNIT_TEST_DFS_ROOT_PROP = UNIT_TEST_PROP_PREFIX + "." + DFS_ROOT_SCHEMA; + + /** + * Update the workspace locations for a plugin. + * + * @param pluginName The plugin to update. + * @param pluginRegistry A plugin registry. + * @param tmpDirPath The directory to use. + */ + public static void updateSchemaLocation(final String pluginName, + final StoragePluginRegistry pluginRegistry, + final File tmpDirPath, + String... schemas) throws ExecutionSetupException { + @SuppressWarnings("resource") + final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName); + final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig(); + + Map<String, WorkspaceConfig> workspaces = Maps.newHashMap(); + + if (schemas.length == 0) { + schemas = new String[]{TMP_SCHEMA}; + } + + for (String schema: schemas) { + WorkspaceConfig workspaceConfig = pluginConfig.workspaces.get(schema); + String inputFormat = workspaceConfig == null ? null: workspaceConfig.getDefaultInputFormat(); + WorkspaceConfig newWorkspaceConfig = new WorkspaceConfig(tmpDirPath.getAbsolutePath(), true, inputFormat); + workspaces.put(schema, newWorkspaceConfig); + } + + pluginConfig.workspaces.putAll(workspaces); + pluginRegistry.createOrUpdate(pluginName, pluginConfig, true); + } + + public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + configureFormatPlugins(pluginRegistry, CP_PLUGIN_NAME); + configureFormatPlugins(pluginRegistry, DFS_PLUGIN_NAME); + } + + public static void configureFormatPlugins(StoragePluginRegistry pluginRegistry, String storagePlugin) throws ExecutionSetupException { + FileSystemPlugin fileSystemPlugin = (FileSystemPlugin) pluginRegistry.getPlugin(storagePlugin); + FileSystemConfig fileSystemConfig = (FileSystemConfig) fileSystemPlugin.getConfig(); + + TextFormatPlugin.TextFormatConfig textConfig = new TextFormatPlugin.TextFormatConfig(); + textConfig.extensions = ImmutableList.of("txt"); + textConfig.fieldDelimiter = '\u0000'; + fileSystemConfig.formats.put("txt", textConfig); + + TextFormatPlugin.TextFormatConfig ssvConfig = new TextFormatPlugin.TextFormatConfig(); + ssvConfig.extensions = ImmutableList.of("ssv"); + ssvConfig.fieldDelimiter = ' '; + fileSystemConfig.formats.put("ssv", ssvConfig); + + TextFormatPlugin.TextFormatConfig psvConfig = new TextFormatPlugin.TextFormatConfig(); + psvConfig.extensions = ImmutableList.of("tbl"); + psvConfig.fieldDelimiter = '|'; + fileSystemConfig.formats.put("psv", psvConfig); + + SequenceFileFormatConfig seqConfig = new SequenceFileFormatConfig(); + seqConfig.extensions = ImmutableList.of("seq"); + fileSystemConfig.formats.put("sequencefile", seqConfig); + + TextFormatPlugin.TextFormatConfig csvhtestConfig = new TextFormatPlugin.TextFormatConfig(); + csvhtestConfig.extensions = ImmutableList.of("csvh-test"); + csvhtestConfig.fieldDelimiter = ','; + csvhtestConfig.extractHeader = true; + csvhtestConfig.skipFirstLine = true; + fileSystemConfig.formats.put("csvh-test", csvhtestConfig); + + pluginRegistry.createOrUpdate(storagePlugin, fileSystemConfig, true); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java deleted file mode 100644 index a9f178a..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.util; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.dfs.FileSystemConfig; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; -import org.apache.drill.exec.store.dfs.WorkspaceConfig; - -import com.google.common.io.Files; -import org.apache.drill.exec.store.easy.json.JSONRecordReader; - -/** - * This class contains utility methods to speed up tests. Some of the production code currently calls this method - * when the production code is executed as part of the test runs. That's the reason why this code has to be in - * production module. - */ -public class TestUtilities { - // Below two variable values are derived from - // <DRILL_SRC_HOME>/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json. - private static final String dfsPluginName = "dfs"; - private static final String dfsTmpSchema = "tmp"; - - // Below two variable values are derived from - // <DRILL_SRC_HOME>/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json. - private static final String dfsTestPluginName = "dfs_test"; - private static final String dfsTestTmpSchema = "tmp"; - - /** - * Create and removes a temporary folder - * - * @return absolute path to temporary folder - */ - public static String createTempDir() { - final File tmpDir = Files.createTempDir(); - tmpDir.deleteOnExit(); - return tmpDir.getAbsolutePath(); - } - - /** - * Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update the location with an - * exclusive temp directory just for use in the current test jvm. - * - * @param pluginRegistry - * @return JVM exclusive temporary directory location. - */ - public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry, - final String tmpDirPath) - throws ExecutionSetupException { - @SuppressWarnings("resource") - final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName); - final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig(); - final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema); - - final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDirPath, true, tmpWSConfig.getDefaultInputFormat()); - - pluginConfig.workspaces.remove(dfsTestTmpSchema); - pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig); - - pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true); - } - - /** - * Make the dfs.tmp schema immutable, so that tests writers don't use the dfs.tmp to create views. - * Schema "dfs.tmp" added as part of the default bootstrap plugins file that comes with drill-java-exec jar - */ - public static void makeDfsTmpSchemaImmutable(final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - @SuppressWarnings("resource") - final FileSystemPlugin dfsPlugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsPluginName); - final FileSystemConfig dfsPluginConfig = (FileSystemConfig) dfsPlugin.getConfig(); - final WorkspaceConfig tmpWSConfig = dfsPluginConfig.workspaces.get(dfsTmpSchema); - - final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpWSConfig.getLocation(), false, - tmpWSConfig.getDefaultInputFormat()); - - dfsPluginConfig.workspaces.remove(dfsTmpSchema); - dfsPluginConfig.workspaces.put(dfsTmpSchema, newTmpWSConfig); - - pluginRegistry.createOrUpdate(dfsPluginName, dfsPluginConfig, true); - } - - /** - * Create JSONRecordReader from input strings. - * @param jsonBatches : list of input strings, each element represent a batch. Each string could either - * be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}". - * @param fragContext : fragment context - * @param columnsToRead : list of schema paths to read from JSON reader. - * @return - */ - public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) { - ObjectMapper mapper = new ObjectMapper(); - List<RecordReader> readers = new ArrayList<>(); - for (String batchJason : jsonBatches) { - JsonNode records; - try { - records = mapper.readTree(batchJason); - } catch (IOException e) { - throw new RuntimeException(e); - } - readers.add(new JSONRecordReader(fragContext, records, null, columnsToRead)); - } - return readers.iterator(); - } - - /** - * Create JSONRecordReader from files on a file system. - * @param fs : file system. - * @param inputPaths : list of .json file paths. - * @param fragContext - * @param columnsToRead - * @return - */ - public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) { - List<RecordReader> readers = new ArrayList<>(); - for (String inputPath : inputPaths) { - readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead)); - } - return readers.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/acc5ed92/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index d95f421..a66fce0 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -206,6 +206,7 @@ drill.exec: { } }, compile: { + codegen.debug.topn: false, compiler: "DEFAULT", debug: true, janino_maxsize: 262144,