http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java index 26d881d..f6e11c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java @@ -63,8 +63,9 @@ public class SingleMergeExchange extends AbstractExchange { protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - if (receiverLocations.size() != 1) + if (receiverLocations.size() != 1) { throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint"); + } receiverLocation = receiverLocations.iterator().next(); }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java index cafdbdd..bf2b4a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java @@ -48,7 +48,9 @@ public class UnionExchange extends AbstractExchange{ @Override protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint."); + if (receiverLocations.size() != 1) { + throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint."); + } this.destinationLocation = receiverLocations.iterator().next(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 7f97624..e25f1c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -41,9 +41,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private RootExec root = null; - private ImplCreator(){} + private ImplCreator() {} - private RootExec getRoot(){ + private RootExec getRoot() { return root; } @@ -78,7 +78,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { ImplCreator i = new ImplCreator(); - if(AssertionUtil.isAssertionsEnabled()){ + if (AssertionUtil.isAssertionsEnabled()) { root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); } @@ -86,9 +86,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo watch.start(); root.accept(i, context); logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS)); - if (i.root == null) + if (i.root == null) { throw new ExecutionSetupException( "The provided fragment did not have a root node that correctly created a RootExec value."); + } return i.getRoot(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java index 8c768e5..82a9a63 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java @@ -42,7 +42,9 @@ public class OperatorCreatorRegistry { public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException { Object opCreator = instanceRegistry.get(operator); - if (opCreator != null) return opCreator; + if (opCreator != null) { + return opCreator; + } Constructor<?> c = constructorRegistry.get(operator); if(c == null) { @@ -75,9 +77,9 @@ public class OperatorCreatorRegistry { Type[] args = ((ParameterizedType)iface).getActualTypeArguments(); interfaceFound = true; boolean constructorFound = false; - for(Constructor<?> constructor : operatorClass.getConstructors()){ + for (Constructor<?> constructor : operatorClass.getConstructors()) { Class<?>[] params = constructor.getParameterTypes(); - if(params.length == 0){ + if (params.length == 0) { Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor); if (old != null) { throw new RuntimeException( @@ -88,7 +90,7 @@ public class OperatorCreatorRegistry { constructorFound = true; } } - if(!constructorFound){ + if (!constructorFound) { logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor", operatorClass.getCanonicalName()); } @@ -97,4 +99,5 @@ public class OperatorCreatorRegistry { } } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index c2a03b9..2712e27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -83,8 +83,9 @@ public class ScanBatch implements RecordBatch { public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; - if (!readers.hasNext()) + if (!readers.hasNext()) { throw new ExecutionSetupException("A scan batch must contain at least one reader."); + } this.currentReader = readers.next(); this.oContext = new OperatorContext(subScanConfig, context); this.currentReader.setOperatorContext(this.oContext); @@ -121,7 +122,7 @@ public class ScanBatch implements RecordBatch { @Override public void kill(boolean sendUpstream) { - if(currentReader != null){ + if (currentReader != null) { currentReader.cleanup(); } @@ -220,8 +221,8 @@ public class ScanBatch implements RecordBatch { private void addPartitionVectors() throws ExecutionSetupException{ try { - if(partitionVectors != null){ - for(ValueVector v : partitionVectors){ + if (partitionVectors != null) { + for (ValueVector v : partitionVectors) { v.clear(); } } @@ -290,7 +291,9 @@ public class ScanBatch implements RecordBatch { if (v == null || v.getClass() != clazz) { // Field does not exist add it to the map and the output container v = TypeHelper.getNewVector(field, oContext.getAllocator()); - if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + if (!clazz.isAssignableFrom(v.getClass())) { + throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + } container.add(v); fieldVectorMap.put(field.key(), v); @@ -342,9 +345,9 @@ public class ScanBatch implements RecordBatch { return WritableBatch.get(this); } - public void cleanup(){ + public void cleanup() { container.clear(); - for(ValueVector v : partitionVectors){ + for (ValueVector v : partitionVectors) { v.clear(); } fieldVectorMap.clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 2b7fdf3..352deae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -79,7 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public boolean innerNext() { - if(!ok){ + if (!ok) { incoming.kill(false); return false; @@ -93,7 +93,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ out = IterOutcome.NONE; } // logger.debug("Outcome of sender next {}", out); - switch(out){ + switch (out) { case STOP: case NONE: FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(), @@ -158,7 +158,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public void success(Ack value, ByteBuf buf) { sendCount.decrement(); - if(value.getOk()) return; + if (value.getOk()) { + return; + } logger.error("Downstream fragment was not accepted. Stopping future sends."); // if we didn't get ack ok, we'll need to kill the query. @@ -170,5 +172,4 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 6eede30..473e3a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -132,10 +132,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @Override public IterOutcome innerNext() { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } @@ -156,8 +156,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); } // fall through. @@ -181,7 +183,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } } - if (schema == null){ + if (schema == null) { // builder may be null at this point if the first incoming batch is empty return IterOutcome.NONE; } @@ -196,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -215,7 +217,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { if (copier == null) { copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch); } else { - for(VectorWrapper<?> i : batch){ + for (VectorWrapper<?> i : batch) { ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); newContainer.add(v); @@ -227,7 +229,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { int count = selectionVector4.getCount(); int copiedRecords = copier.copyRecords(0, count); assert copiedRecords == count; - for(VectorWrapper<?> v : newContainer){ + for (VectorWrapper<?> v : newContainer) { ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(count); } @@ -253,11 +255,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { ClassGenerator<PriorityQueue> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -269,9 +273,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); - }else{ + } else { jc._then()._return(out.getValue().minus()); } g.rotateBlock(); @@ -377,5 +381,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java index 58dd247..92d1882 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java @@ -82,10 +82,12 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra } /* Inject trace operator */ - if (list.size() > 0) - newOp = op.getNewWithChildren(list); - newOp.setOperatorId(op.getOperatorId()); + if (list.size() > 0) { + newOp = op.getNewWithChildren(list); + } + newOp.setOperatorId(op.getOperatorId()); return newOp; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 99eeed3..8c1a4c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -82,8 +82,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { IterOutcome upstream; do { upstream = next(incoming); - if(first && upstream == IterOutcome.OK) + if(first && upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; + } first = false; switch(upstream) { @@ -91,14 +92,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { case NONE: case STOP: cleanup(); - if (upstream == IterOutcome.STOP) + if (upstream == IterOutcome.STOP) { return upstream; + } break; case OK_NEW_SCHEMA: try{ setupNewSchema(); - }catch(Exception ex){ + } catch(Exception ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -113,9 +115,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { throw new RuntimeException(ex); } - for(VectorWrapper v : incoming) + for(VectorWrapper v : incoming) { v.getValueVector().clear(); - + } break; default: @@ -176,4 +178,5 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { throw new RuntimeException("Failed to close RecordWriter", ex); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index e9be2ac..c522870 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -82,7 +82,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { @Override public int getRecordCount() { - if(done) return 0; + if (done) { + return 0; + } return aggregator.getOutputCount(); } @@ -102,7 +104,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { case STOP: return outcome; case OK_NEW_SCHEMA: - if (!createAggregator()){ + if (!createAggregator()) { done = true; return IterOutcome.STOP; } @@ -131,10 +133,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); - while(true){ + while (true) { AggOutcome out = aggregator.doWork(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ + switch (out) { case CLEANUP_AND_RETURN: container.zeroVectors(); aggregator.cleanup(); @@ -150,7 +152,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return aggregator.getOutcome(); case UPDATE_AGGREGATOR: aggregator = null; - if(!createAggregator()){ + if (!createAggregator()) { return IterOutcome.STOP; } continue; @@ -168,23 +170,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { */ private boolean createAggregator() { logger.debug("Creating new aggregator."); - try{ + try { stats.startSetup(); this.aggregator = createAggregatorInternal(); return true; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch (SchemaChangeException | ClassTransformationException | IOException ex) { context.fail(ex); container.clear(); incoming.kill(false); return false; - }finally{ + } finally { stats.stopSetup(); } } private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ - CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - ClassGenerator<HashAggregator> cg = top.getRoot(); + CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + ClassGenerator<HashAggregator> cg = top.getRoot(); ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder"); container.clear(); @@ -199,10 +201,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { int i; - for(i = 0; i < numGroupByExprs; i++) { + for (i = 0; i < numGroupByExprs; i++) { NamedExpression ne = popConfig.getGroupByExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() ); - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -211,13 +215,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { groupByOutFieldIds[i] = container.add(vv); } - for(i = 0; i < numAggrExprs; i++){ + for (i = 0; i < numAggrExprs; i++) { NamedExpression ne = popConfig.getAggrExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() ); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -248,7 +256,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return agg; } - private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) { cg.setMappingSet(UpdateAggrValuesMapping); @@ -260,8 +267,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE); } - private void setupGetIndex(ClassGenerator<HashAggregator> cg){ - switch(incoming.getSchema().getSelectionVectorMode()){ + private void setupGetIndex(ClassGenerator<HashAggregator> cg) { + switch (incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: { JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class)); cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index b6b8874..d25a952 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -156,7 +156,9 @@ public abstract class HashAggTemplate implements HashAggregator { boolean status = true; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { if (outputRecordValues(i, batchOutputCount) ) { - if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ; + if (EXTRA_DEBUG_2) { + logger.debug("Outputting values to output index: {}", batchOutputCount) ; + } batchOutputCount++; outNumRecordsHolder.value++; } else { @@ -270,31 +272,41 @@ public abstract class HashAggTemplate implements HashAggregator { outside: while(true) { // loop through existing records, aggregating the values as necessary. - if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()..."); + if (EXTRA_DEBUG_1) { + logger.debug ("Starting outer loop of doWork()..."); + } for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + if(EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + } boolean success = checkGroupAndAggrValues(currentIndex); assert success : "HashAgg couldn't copy values."; } - if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex); + if (EXTRA_DEBUG_1) { + logger.debug("Processed {} records", underlyingIndex); + } - try{ + try { - while(true){ + while (true) { // Cleanup the previous batch since we are done processing it. for (VectorWrapper<?> v : incoming) { v.getValueVector().clear(); } IterOutcome out = outgoing.next(0, incoming); - if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out); - switch(out){ + if (EXTRA_DEBUG_1) { + logger.debug("Received IterOutcome of {}", out); + } + switch (out) { case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; case OK_NEW_SCHEMA: - if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + if (EXTRA_DEBUG_1) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } newSchema = true; this.cleanup(); // TODO: new schema case needs to be handled appropriately @@ -302,14 +314,16 @@ public abstract class HashAggTemplate implements HashAggregator { case OK: resetIndex(); - if(incoming.getRecordCount() == 0){ + if (incoming.getRecordCount() == 0) { continue; } else { boolean success = checkGroupAndAggrValues(currentIndex); assert success : "HashAgg couldn't copy values."; incIndex(); - if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop"); + if (EXTRA_DEBUG_1) { + logger.debug("Continuing outside loop"); + } continue outside; } @@ -343,8 +357,10 @@ public abstract class HashAggTemplate implements HashAggregator { // placeholder... } } - } finally{ - if(first) first = !first; + } finally { + if (first) { + first = !first; + } } } @@ -373,7 +389,7 @@ public abstract class HashAggTemplate implements HashAggregator { } @Override - public void cleanup(){ + public void cleanup() { if (htable != null) { htable.clear(); htable = null; @@ -392,28 +408,28 @@ public abstract class HashAggTemplate implements HashAggregator { } } - private final AggOutcome setOkAndReturn(){ - if(first){ + private final AggOutcome setOkAndReturn() { + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } - for(VectorWrapper<?> v : outgoing){ + for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } return AggOutcome.RETURN_OUTCOME; } - private final void incIndex(){ + private final void incIndex() { underlyingIndex++; - if(underlyingIndex >= incoming.getRecordCount()){ + if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } currentIndex = getVectorIndex(underlyingIndex); } - private final void resetIndex(){ + private final void resetIndex() { underlyingIndex = -1; incIndex(); } @@ -422,7 +438,9 @@ public abstract class HashAggTemplate implements HashAggregator { BatchHolder bh = new BatchHolder(); batchHolders.add(bh); - if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + if (EXTRA_DEBUG_1) { + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + } bh.setup(); } @@ -465,9 +483,9 @@ public abstract class HashAggTemplate implements HashAggregator { outputCount += numOutputRecords; - if(first){ + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } @@ -486,14 +504,14 @@ public abstract class HashAggTemplate implements HashAggregator { } else { if (!outputKeysStatus) { logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex); - for(VectorWrapper<?> v : outContainer) { + for (VectorWrapper<?> v : outContainer) { logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity()); } context.fail(new Exception("Failed to output keys for current batch !")); } if (!outputValuesStatus) { logger.debug("Failed to output values for current batch index: {} ", outBatchIndex); - for(VectorWrapper<?> v : outContainer) { + for (VectorWrapper<?> v : outContainer) { logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity()); } context.fail(new Exception("Failed to output values for current batch !")); @@ -557,7 +575,9 @@ public abstract class HashAggTemplate implements HashAggregator { if (putStatus == HashTable.PutStatus.KEY_PRESENT) { - if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values"); + if (EXTRA_DEBUG_2) { + logger.debug("Group-by key already present in hash table, updating the aggregate values"); + } // debugging //if (holder.value == 100018 || holder.value == 100021) { @@ -566,7 +586,9 @@ public abstract class HashAggTemplate implements HashAggregator { } else if (putStatus == HashTable.PutStatus.KEY_ADDED) { - if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; + if (EXTRA_DEBUG_2) { + logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; + } // debugging // if (holder.value == 100018 || holder.value == 100021) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 4277f23..238242b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -40,7 +40,7 @@ public interface HashAggregator { public static enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR - } + } public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index 3e6def1..e690060 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -34,8 +34,8 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector2 sv2; private final SelectionVector4 sv4; - public InternalBatch(RecordBatch incoming){ - switch(incoming.getSchema().getSelectionVectorMode()){ + public InternalBatch(RecordBatch incoming) { + switch(incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); this.sv2 = null; @@ -69,13 +69,17 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ return container.iterator(); } - public void clear(){ - if(sv2 != null) sv2.clear(); - if(sv4 != null) sv4.clear(); + public void clear() { + if (sv2 != null) { + sv2.clear(); + } + if (sv4 != null) { + sv4.clear(); + } container.clear(); } - public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){ + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds) { return container.getValueAccessorById(clazz, fieldIds); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 820f722..ced5179 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -67,8 +67,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { @Override public int getRecordCount() { - if(done) return 0; - if (aggregator == null) return 0; + if (done) { + return 0; + } + if (aggregator == null) { + return 0; + } return aggregator.getOutputCount(); } @@ -88,7 +92,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { case STOP: return outcome; case OK_NEW_SCHEMA: - if (!createAggregator()){ + if (!createAggregator()) { done = true; return IterOutcome.STOP; } @@ -100,12 +104,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } - while(true){ + while (true) { AggOutcome out = aggregator.doWork(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ + switch (out) { case CLEANUP_AND_RETURN: - if (!first) container.zeroVectors(); + if (!first) { + container.zeroVectors(); + } done = true; // fall through case RETURN_OUTCOME: @@ -122,7 +128,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { case UPDATE_AGGREGATOR: first = false; aggregator = null; - if(!createAggregator()){ + if (!createAggregator()) { return IterOutcome.STOP; } continue; @@ -142,23 +148,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { */ private boolean createAggregator() { logger.debug("Creating new aggregator."); - try{ + try { stats.startSetup(); this.aggregator = createAggregatorInternal(); return true; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch (SchemaChangeException | ClassTransformationException | IOException ex) { context.fail(ex); container.clear(); incoming.kill(false); return false; - }finally{ + } finally { stats.stopSetup(); } } - - - private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry()); container.clear(); @@ -169,20 +172,24 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { ErrorCollector collector = new ErrorCollectorImpl(); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { NamedExpression ne = popConfig.getKeys()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() ); - if(expr == null) continue; + if (expr == null) { + continue; + } keyExprs[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); keyOutputIds[i] = container.add(vector); } - for(int i =0; i < valueExprs.length; i++){ + for (int i =0; i < valueExprs.length; i++) { NamedExpression ne = popConfig.getExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -190,7 +197,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); } - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } setupIsSame(cg, keyExprs); setupIsSameApart(cg, keyExprs); @@ -207,15 +216,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { return agg; } - - private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null); private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); - private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){ + private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(IS_SAME_I1); - for(LogicalExpression expr : keyExprs){ + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(IS_SAME_I1); HoldingContainer first = cg.addExpr(expr, false); @@ -234,9 +241,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); - private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){ + private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(ISA_B1); - for(LogicalExpression expr : keyExprs){ + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(ISA_B1); HoldingContainer first = cg.addExpr(expr, false); @@ -254,9 +261,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); - private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){ + private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) { cg.setMappingSet(EVAL); - for(LogicalExpression ex : valueExprs){ + for (LogicalExpression ex : valueExprs) { HoldingContainer hc = cg.addExpr(ex); cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } @@ -265,9 +272,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null)); - private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){ + private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true)); cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } @@ -280,10 +287,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null); private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS); - private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){ + private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS_PREV); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this. logger.debug("Writing out expr {}", keyExprs[i]); cg.rotateBlock(); @@ -297,8 +304,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE); } - private void getIndex(ClassGenerator<StreamingAggregator> g){ - switch(incoming.getSchema().getSelectionVectorMode()){ + private void getIndex(ClassGenerator<StreamingAggregator> g) { + switch (incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: { JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 53ac1ed..c2a5715 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -60,7 +60,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private void allocateOutgoing() { - for(VectorWrapper<?> w : outgoing){ + for (VectorWrapper<?> w : outgoing) { w.getValueVector().allocateNew(); } } @@ -75,7 +75,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return outputCount; } - private AggOutcome tooBigFailure(){ + private AggOutcome tooBigFailure() { context.fail(new Exception(TOO_BIG_ERROR)); this.outcome = IterOutcome.STOP; return AggOutcome.CLEANUP_AND_RETURN; @@ -87,11 +87,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { outcome = IterOutcome.NONE; return AggOutcome.CLEANUP_AND_RETURN; } - try{ // outside loop to ensure that first is set to false after the first run. + try { // outside loop to ensure that first is set to false after the first run. outputCount = 0; // if we're in the first state, allocate outgoing. - if(first){ + if (first) { allocateOutgoing(); } @@ -119,8 +119,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } // pick up a remainder batch if we have one. - if(remainderBatch != null){ - if (!outputToBatch( previousIndex )) return tooBigFailure(); + if (remainderBatch != null) { + if (!outputToBatch( previousIndex )) { + return tooBigFailure(); + } remainderBatch.clear(); remainderBatch = null; return setOkAndReturn(); @@ -131,38 +133,56 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if (pendingOutput) { allocateOutgoing(); pendingOutput = false; - if(EXTRA_DEBUG) logger.debug("Attempting to output remainder."); - if (!outputToBatch( previousIndex)) return tooBigFailure(); + if (EXTRA_DEBUG) { + logger.debug("Attempting to output remainder."); + } + if (!outputToBatch( previousIndex)) { + return tooBigFailure(); + } } - if(newSchema){ + if (newSchema) { return AggOutcome.UPDATE_AGGREGATOR; } - if(lastOutcome != null){ + if (lastOutcome != null) { outcome = lastOutcome; return AggOutcome.CLEANUP_AND_RETURN; } - outside: while(true){ + outside: while(true) { // loop through existing records, adding as necessary. for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + if (EXTRA_DEBUG) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + } if (previousIndex == -1) { - if (EXTRA_DEBUG) logger.debug("Adding the initial row's keys and values."); + if (EXTRA_DEBUG) { + logger.debug("Adding the initial row's keys and values."); + } addRecordInc(currentIndex); } else if (isSame( previousIndex, currentIndex )) { - if(EXTRA_DEBUG) logger.debug("Values were found the same, adding."); + if (EXTRA_DEBUG) { + logger.debug("Values were found the same, adding."); + } addRecordInc(currentIndex); } else { - if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch."); + if (EXTRA_DEBUG) { + logger.debug("Values were different, outputting previous batch."); + } if (outputToBatch(previousIndex)) { - if(EXTRA_DEBUG) logger.debug("Output successful."); + if (EXTRA_DEBUG) { + logger.debug("Output successful."); + } addRecordInc(currentIndex); } else { - if(EXTRA_DEBUG) logger.debug("Output failed."); - if(outputCount == 0) return tooBigFailure(); + if (EXTRA_DEBUG) { + logger.debug("Output failed."); + } + if (outputCount == 0) { + return tooBigFailure(); + } // mark the pending output but move forward for the next cycle. pendingOutput = true; @@ -178,23 +198,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { InternalBatch previous = null; - try{ - while(true){ + try { + while (true) { if (previous != null) { previous.clear(); } previous = new InternalBatch(incoming); IterOutcome out = outgoing.next(0, incoming); - if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out); - switch(out){ + if (EXTRA_DEBUG) { + logger.debug("Received IterOutcome of {}", out); + } + switch (out) { case NONE: done = true; lastOutcome = out; if (first && addedRecordCount == 0) { return setOkAndReturn(); - } else if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous; - if(EXTRA_DEBUG) logger.debug("Received no more batches, returning."); + } else if(addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { + remainderBatch = previous; + } + if (EXTRA_DEBUG) { + logger.debug("Received no more batches, returning."); + } return setOkAndReturn(); }else{ if (first && out == IterOutcome.OK) { @@ -204,17 +230,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.CLEANUP_AND_RETURN; } - - case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; case OK_NEW_SCHEMA: - if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous; - if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning."); + if (EXTRA_DEBUG) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } + if (addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { + remainderBatch = previous; + } + if (EXTRA_DEBUG) { + logger.debug("Wrote out end of previous batch, returning."); + } newSchema = true; return setOkAndReturn(); } @@ -222,21 +252,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.UPDATE_AGGREGATOR; case OK: resetIndex(); - if(incoming.getRecordCount() == 0){ + if (incoming.getRecordCount() == 0) { continue; - }else{ - if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){ - if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding."); + } else { + if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) { + if (EXTRA_DEBUG) { + logger.debug("New value was same as last value of previous batch, adding."); + } addRecordInc(currentIndex); previousIndex = currentIndex; incIndex(); - if(EXTRA_DEBUG) logger.debug("Continuing outside"); + if (EXTRA_DEBUG) { + logger.debug("Continuing outside"); + } continue outside; - }else{ // not the same - if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside."); + } else { // not the same + if (EXTRA_DEBUG) { + logger.debug("This is not the same as the previous, add record and continue outside."); + } previousIndex = currentIndex; - if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ){ + if (addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { remainderBatch = previous; return setOkAndReturn(); } @@ -251,72 +287,78 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.CLEANUP_AND_RETURN; } - } - }finally{ + } finally { // make sure to clear previous if we haven't saved it. - if(remainderBatch == null && previous != null){ + if (remainderBatch == null && previous != null) { previous.clear(); } } } - }finally{ - if(first) first = !first; + } finally { + if (first) { + first = !first; + } } } - - private final void incIndex(){ + private final void incIndex() { underlyingIndex++; - if(underlyingIndex >= incoming.getRecordCount()){ + if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } currentIndex = getVectorIndex(underlyingIndex); } - private final void resetIndex(){ + private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private final AggOutcome setOkAndReturn(){ - if(first){ + private final AggOutcome setOkAndReturn() { + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } - for(VectorWrapper<?> v : outgoing){ + for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } return AggOutcome.RETURN_OUTCOME; } - private final boolean outputToBatch(int inIndex){ + private final boolean outputToBatch(int inIndex) { - if(!outputRecordKeys(inIndex, outputCount)){ - if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount); + if (!outputRecordKeys(inIndex, outputCount)) { + if(EXTRA_DEBUG) { + logger.debug("Failure while outputting keys {}", outputCount); + } return false; } - if(!outputRecordValues(outputCount)){ - if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount); + if (!outputRecordValues(outputCount)) { + if (EXTRA_DEBUG) { + logger.debug("Failure while outputting values {}", outputCount); + } return false; } - if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount); + if (EXTRA_DEBUG) { + logger.debug("{} values output successfully", outputCount); + } resetValues(); outputCount++; addedRecordCount = 0; return true; } - private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){ + private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) { boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) // && outputRecordValues(outIndex) // && resetValues(); - if(success){ + if (success) { resetValues(); outputCount++; addedRecordCount = 0; @@ -325,17 +367,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return success; } - private void addRecordInc(int index){ + private void addRecordInc(int index) { addRecord(index); this.addedRecordCount++; } @Override - public void cleanup(){ - if(remainderBatch != null) remainderBatch.clear(); + public void cleanup() { + if (remainderBatch != null) { + remainderBatch.clear(); + } } - public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 8f5f29b..96da00b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -28,8 +28,8 @@ public interface StreamingAggregator { public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class); public static enum AggOutcome { - RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; - } + RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; + } public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 195d249..f77407e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -53,23 +53,23 @@ public class ChainedHashTable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class); private static final GeneratorMapping KEY_MATCH_BUILD = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping KEY_MATCH_PROBE = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping GET_HASH_BUILD = - GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, + GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping GET_HASH_PROBE = - GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, + GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping SET_VALUE = - GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping OUTPUT_KEYS = @@ -138,8 +138,12 @@ public class ChainedHashTable { int i = 0; for (NamedExpression ne : htConfig.getKeyExprsBuild()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); - if (expr == null) continue; + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { + continue; + } keyExprsBuild[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); @@ -155,8 +159,12 @@ public class ChainedHashTable { i = 0; for (NamedExpression ne : htConfig.getKeyExprsProbe()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); - if (expr == null) continue; + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { + continue; + } keyExprsProbe[i] = expr; i++; } @@ -293,4 +301,3 @@ public class ChainedHashTable { } } } - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index b03880c..6024523 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -164,10 +164,11 @@ public abstract class HashTableTemplate implements HashTable { assert (currentIdxWithinBatch < HashTable.BATCH_SIZE); assert (incomingRowIdx < HashTable.BATCH_SIZE); - if (isProbe) + if (isProbe) { match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch); - else + } else { match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch); + } if (! match) { currentIdxHolder.value = links.getAccessor().get(currentIdxWithinBatch); @@ -196,7 +197,9 @@ public abstract class HashTableTemplate implements HashTable { maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch); - if (EXTRA_DEBUG) logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue); + if (EXTRA_DEBUG) { + logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue); + } return true; } @@ -225,7 +228,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } } else { // follow the new table's hash chain until we encounter empty slot. Note that the hash chain could @@ -245,7 +250,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } break; } else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { @@ -253,7 +260,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } break; } @@ -381,11 +390,19 @@ public abstract class HashTableTemplate implements HashTable { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); - if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0"); - if (initialCap <= 0) throw new IllegalArgumentException("The initial capacity must be greater than 0"); - if (initialCap > MAXIMUM_CAPACITY) throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed"); + if (loadf <= 0 || Float.isNaN(loadf)) { + throw new IllegalArgumentException("Load factor must be a valid number greater than 0"); + } + if (initialCap <= 0) { + throw new IllegalArgumentException("The initial capacity must be greater than 0"); + } + if (initialCap > MAXIMUM_CAPACITY) { + throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed"); + } - if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) throw new IllegalArgumentException("Hash table must have at least 1 key expression"); + if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) { + throw new IllegalArgumentException("Hash table must have at least 1 key expression"); + } this.htConfig = htConfig; this.context = context; @@ -397,8 +414,9 @@ public abstract class HashTableTemplate implements HashTable { // round up the initial capacity to nearest highest power of 2 tableSize = roundUpToPowerOf2(initialCap); - if (tableSize > MAXIMUM_CAPACITY) + if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; + } threshold = (int) Math.ceil(tableSize * loadf); @@ -500,7 +518,9 @@ public abstract class HashTableTemplate implements HashTable { currentIdx = freeIndex++; addBatchIfNeeded(currentIdx); - if (EXTRA_DEBUG) logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx); + if (EXTRA_DEBUG) { + logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx); + } if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) { // update the start index array @@ -543,14 +563,16 @@ public abstract class HashTableTemplate implements HashTable { currentIdx = freeIndex++; addBatchIfNeeded(currentIdx); - if (EXTRA_DEBUG) logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); + if (EXTRA_DEBUG) { + logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); + } if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) { htIdxHolder.value = currentIdx; return PutStatus.KEY_ADDED; - } - else + } else { return PutStatus.PUT_FAILED; + } } return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED ; @@ -618,7 +640,9 @@ public abstract class HashTableTemplate implements HashTable { if (currentIdx >= totalBatchSize) { BatchHolder bh = addBatchHolder(); - if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); + if (EXTRA_DEBUG) { + logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); + } return bh; } else { @@ -638,12 +662,15 @@ public abstract class HashTableTemplate implements HashTable { // in the new table.. the metadata consists of the startIndices, links and hashValues. // Note that the keys stored in the BatchHolders are not moved around. private void resizeAndRehashIfNeeded() { - if (numEntries < threshold) + if (numEntries < threshold) { return; + } long t0 = System.currentTimeMillis(); - if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold); + if (EXTRA_DEBUG) { + logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold); + } // If the table size is already MAXIMUM_CAPACITY, don't resize // the table, but set the threshold to Integer.MAX_VALUE such that @@ -656,8 +683,9 @@ public abstract class HashTableTemplate implements HashTable { int newSize = 2 * tableSize; tableSize = roundUpToPowerOf2(newSize); - if (tableSize > MAXIMUM_CAPACITY) + if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; + } // set the new threshold based on the new table size and load factor threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor()); @@ -717,5 +745,3 @@ public abstract class HashTableTemplate implements HashTable { protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ; } - - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index bf00194..f1fcce0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -79,7 +79,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected void doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); -// for(VectorWrapper<?> v : container){ +// for (VectorWrapper<?> v : container) { // ValueVector.Mutator m = v.getValueVector().getMutator(); // m.setValueCount(recordCount); // } @@ -88,8 +88,12 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override public void cleanup() { - if(sv2 != null) sv2.clear(); - if(sv4 != null) sv4.clear(); + if (sv2 != null) { + sv2.clear(); + } + if (sv4 != null) { + sv4.clear(); + } super.cleanup(); } @@ -100,7 +104,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ sv2.clear(); } - switch(incoming.getSchema().getSelectionVectorMode()){ + switch (incoming.getSchema().getSelectionVectorMode()) { case NONE: sv2 = new SelectionVector2(oContext.getAllocator()); this.filter = generateSV2Filterer(); @@ -137,13 +141,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } cg.addExpr(new ReturnValueExpression(expr)); -// for(VectorWrapper<?> i : incoming){ +// for (VectorWrapper<?> i : incoming) { // ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); // container.add(v); // allocators.add(getAllocator4(v)); @@ -177,13 +181,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } cg.addExpr(new ReturnValueExpression(expr)); - for(VectorWrapper<?> v : incoming){ + for (VectorWrapper<?> v : incoming) { TransferPair pair = v.getValueVector().getTransferPair(); container.add(pair.getTo()); transfers.add(pair); @@ -202,5 +206,4 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 15044b8..2a08c05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -457,8 +457,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { return hj; } - private void allocateVectors(){ - for(VectorWrapper<?> v : container){ + private void allocateVectors() { + for(VectorWrapper<?> v : container) { v.getValueVector().allocateNew(); } } @@ -472,7 +472,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } private void updateStats(HashTable htable) { - if(htable == null) return; + if (htable == null) { + return; + } htable.getStats(htStats); this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); @@ -488,7 +490,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { @Override public void cleanup() { - if(hjHelper != null){ + if (hjHelper != null) { hjHelper.clear(); } @@ -504,4 +506,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { left.cleanup(); right.cleanup(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 785deae..133289e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -94,11 +94,13 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { boolean success = true; while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords); - if(success){ + if (success) { recordsProcessed++; outputRecords++; - }else{ - if(outputRecords == 0) throw new IllegalStateException("Too big to fail."); + } else { + if (outputRecords == 0) { + throw new IllegalStateException("Too big to fail."); + } break; } } @@ -166,11 +168,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // && projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ + if (!success) { // we failed to project. redo this record. getNextRecord = false; return; - }else{ + } else { outputRecords++; /* Projected single row from the build side with matching key but there @@ -182,8 +184,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { * from the probe side. Drain the next row in the probe side. */ recordsProcessed++; - } - else { + } else { /* There is more than one row with the same key on the build side * don't drain more records from the probe side till we have projected * all the rows with this key @@ -197,10 +198,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // If we have a left outer join, project the keys if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { boolean success = projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ - if(outputRecords == 0){ + if (!success) { + if (outputRecords == 0) { throw new IllegalStateException("Record larger than single batch."); - }else{ + } else { // we've output some records but failed to output this one. return and wait for next call. return; } @@ -214,10 +215,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { hjHelper.setRecordMatched(currentCompositeIdx); boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // && projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ - if(outputRecords == 0){ + if (!success) { + if (outputRecords == 0) { throw new IllegalStateException("Record larger than single batch."); - }else{ + } else { // we've output some records but failed to output this one. return and wait for next call. return; } @@ -264,5 +265,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch, @Named("outgoing") RecordBatch outgoing); public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); + public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); + }
