http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java index 83e644e..d21100b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java @@ -1,4 +1,3 @@ - /******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one @@ -37,27 +36,31 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{ private final RepeatedListVector container; private FieldReader reader; - public RepeatedListReaderImpl(String name, RepeatedListVector container){ + public RepeatedListReaderImpl(String name, RepeatedListVector container) { super(); this.name = name; this.container = container; } @Override - public MajorType getType(){ + public MajorType getType() { return TYPE; } @Override - public void copyAsValue(ListWriter writer){ - if(currentOffset == NO_VALUES) return; + public void copyAsValue(ListWriter writer) { + if (currentOffset == NO_VALUES) { + return; + } RepeatedListWriter impl = (RepeatedListWriter) writer; impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container)); } @Override - public void copyAsField(String name, MapWriter writer){ - if(currentOffset == NO_VALUES) return; + public void copyAsField(String name, MapWriter writer) { + if (currentOffset == NO_VALUES) { + return; + } RepeatedListWriter impl = (RepeatedListWriter) writer.list(name); impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container)); } @@ -66,31 +69,35 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{ private int maxOffset; @Override - public int size(){ + public int size() { return maxOffset - currentOffset; } @Override - public void setPosition(int index){ + public void setPosition(int index) { super.setPosition(index); RepeatedListHolder h = new RepeatedListHolder(); container.getAccessor().get(index, h); - if(h.start == h.end){ + if (h.start == h.end) { currentOffset = NO_VALUES; - }else{ + } else { currentOffset = h.start-1; maxOffset = h.end; - if(reader != null) reader.setPosition(currentOffset); + if(reader != null) { + reader.setPosition(currentOffset); + } } } @Override - public boolean next(){ - if(currentOffset +1 < maxOffset){ + public boolean next() { + if (currentOffset +1 < maxOffset) { currentOffset++; - if(reader != null) reader.setPosition(currentOffset); + if (reader != null) { + reader.setPosition(currentOffset); + } return true; - }else{ + } else { currentOffset = NO_VALUES; return false; } @@ -102,22 +109,20 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{ } @Override - public FieldReader reader(){ - if(reader == null){ + public FieldReader reader() { + if (reader == null) { reader = container.get(name, ValueVector.class).getAccessor().getReader(); - if (currentOffset == NO_VALUES) + if (currentOffset == NO_VALUES) { reader = NullReader.INSTANCE; - else + } else { reader.setPosition(currentOffset); + } } return reader; } - public boolean isSet(){ + public boolean isSet() { return true; } - } - -
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java index faf3508..e57e37c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java @@ -1,5 +1,3 @@ - - /******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one @@ -20,7 +18,6 @@ ******************************************************************************/ package org.apache.drill.exec.vector.complex.impl; - import java.util.Map; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -43,20 +40,20 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ this.vector = vector; } - private void setChildrenPosition(int index){ - for(FieldReader r : fields.values()){ + private void setChildrenPosition(int index) { + for (FieldReader r : fields.values()) { r.setPosition(index); } } @Override - public FieldReader reader(String name){ + public FieldReader reader(String name) { FieldReader reader = fields.get(name); - if(reader == null){ + if (reader == null) { ValueVector child = vector.get(name, ValueVector.class); - if(child == null){ + if (child == null) { reader = NullReader.INSTANCE; - }else{ + } else { reader = child.getAccessor().getReader(); } fields.put(name, reader); @@ -67,8 +64,9 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ @Override public FieldReader reader() { - if (currentOffset == NO_VALUES) + if (currentOffset == NO_VALUES) { return NullReader.INSTANCE; + } setChildrenPosition(currentOffset); return new SingleLikeRepeatedMapReaderImpl(vector, this); @@ -78,7 +76,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ private int maxOffset; @Override - public int size(){ + public int size() { if (isNull()) { return 0; } @@ -86,26 +84,26 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ } @Override - public void setPosition(int index){ + public void setPosition(int index) { super.setPosition(index); RepeatedMapHolder h = new RepeatedMapHolder(); vector.getAccessor().get(index, h); - if(h.start == h.end){ + if (h.start == h.end) { currentOffset = NO_VALUES; - }else{ + } else { currentOffset = h.start-1; maxOffset = h.end; setChildrenPosition(currentOffset); } } - public void setSinglePosition(int index, int childIndex){ + public void setSinglePosition(int index, int childIndex) { super.setPosition(index); RepeatedMapHolder h = new RepeatedMapHolder(); vector.getAccessor().get(index, h); - if(h.start == h.end){ + if (h.start == h.end) { currentOffset = NO_VALUES; - }else{ + } else { int singleOffset = h.start + childIndex; assert singleOffset < h.end; currentOffset = singleOffset; @@ -115,11 +113,11 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ } @Override - public boolean next(){ - if(currentOffset +1 < maxOffset){ + public boolean next() { + if (currentOffset +1 < maxOffset) { setChildrenPosition(++currentOffset); return true; - }else{ + } else { currentOffset = NO_VALUES; return false; } @@ -135,12 +133,12 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ } @Override - public MajorType getType(){ + public MajorType getType() { return vector.getField().getType(); } @Override - public java.util.Iterator<String> iterator(){ + public java.util.Iterator<String> iterator() { return vector.fieldNameIterator(); } @@ -150,26 +148,29 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ } @Override - public void copyAsValue(MapWriter writer){ - if(currentOffset == NO_VALUES) return; + public void copyAsValue(MapWriter writer) { + if (currentOffset == NO_VALUES) { + return; + } RepeatedMapWriter impl = (RepeatedMapWriter) writer; impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); } - public void copyAsValueSingle(MapWriter writer){ - if(currentOffset == NO_VALUES) return; + public void copyAsValueSingle(MapWriter writer) { + if (currentOffset == NO_VALUES) { + return; + } SingleMapWriter impl = (SingleMapWriter) writer; impl.inform(impl.container.copyFromSafe(currentOffset, impl.idx(), vector)); } @Override - public void copyAsField(String name, MapWriter writer){ - if(currentOffset == NO_VALUES) return; + public void copyAsField(String name, MapWriter writer) { + if (currentOffset == NO_VALUES) { + return; + } RepeatedMapWriter impl = (RepeatedMapWriter) writer.map(name); impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); } - } - - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java index 872c5e3..c2284ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java @@ -29,13 +29,6 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; - - - - - - - @SuppressWarnings("unused") public class SingleListReaderImpl extends AbstractFieldReader{ @@ -44,22 +37,24 @@ public class SingleListReaderImpl extends AbstractFieldReader{ private final AbstractContainerVector container; private FieldReader reader; - public SingleListReaderImpl(String name, AbstractContainerVector container){ + public SingleListReaderImpl(String name, AbstractContainerVector container) { super(); this.name = name; this.container = container; } @Override - public MajorType getType(){ + public MajorType getType() { return TYPE; } @Override - public void setPosition(int index){ + public void setPosition(int index) { super.setPosition(index); - if(reader != null) reader.setPosition(index); + if (reader != null) { + reader.setPosition(index); + } } @Override @@ -68,8 +63,8 @@ public class SingleListReaderImpl extends AbstractFieldReader{ } @Override - public FieldReader reader(){ - if(reader == null){ + public FieldReader reader() { + if (reader == null) { reader = container.get(name, ValueVector.class).getAccessor().getReader(); setPosition(idx()); } @@ -82,16 +77,13 @@ public class SingleListReaderImpl extends AbstractFieldReader{ } @Override - public void copyAsValue(ListWriter writer){ + public void copyAsValue(ListWriter writer) { throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list."); } @Override - public void copyAsField(String name, MapWriter writer){ + public void copyAsField(String name, MapWriter writer) { throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list."); } - - } - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/ErrorHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/ErrorHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/ErrorHelper.java index 5679a4f..51b4e32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/ErrorHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/ErrorHelper.java @@ -35,7 +35,7 @@ public class ErrorHelper { // } public static DrillPBError logAndConvertError(DrillbitEndpoint endpoint, String message, Throwable t, Logger logger, - boolean verbose){ + boolean verbose) { String id = UUID.randomUUID().toString(); DrillPBError.Builder builder = DrillPBError.newBuilder(); builder.setEndpoint(endpoint); @@ -65,7 +65,9 @@ public class ErrorHelper { while (true) { rootCause = t; if (t.getCause() == null || t.getCause() == t - || (t instanceof SqlParseException && t.getCause() instanceof ParseException)) break; + || (t instanceof SqlParseException && t.getCause() instanceof ParseException)) { + break; + } t = t.getCause(); } @@ -78,4 +80,5 @@ public class ErrorHelper { return builder.build(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index a7f3666..0ac606c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -58,10 +58,11 @@ public class ControlHandlerImpl implements ControlMessageHandler { this.bee = bee; } - @Override public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { - if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType); + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received bit com message of type {}", rpcType); + } switch (rpcType) { @@ -112,8 +113,6 @@ public class ControlHandlerImpl implements ControlMessageHandler { } - - /* (non-Javadoc) * @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment) */ @@ -124,19 +123,18 @@ public class ControlHandlerImpl implements ControlMessageHandler { ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman()); NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); - try{ + try { FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson()); FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener); bee.addFragmentRunner(fr); } catch (Exception e) { listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e); } catch (OutOfMemoryError t) { - if(t.getMessage().startsWith("Direct buffer")){ + if (t.getMessage().startsWith("Direct buffer")) { listener.fail(fragment.getHandle(), "Failure due to error", t); - }else{ + } else { throw t; } - } } @@ -145,16 +143,17 @@ public class ControlHandlerImpl implements ControlMessageHandler { * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle) */ @Override - public Ack cancelFragment(FragmentHandle handle){ + public Ack cancelFragment(FragmentHandle handle) { FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); - - if(manager != null){ + if (manager != null) { // try remote fragment cancel. manager.cancel(); - }else{ + } else { // then try local cancel. FragmentExecutor runner = bee.getFragmentRunner(handle); - if(runner != null) runner.cancel(); + if (runner != null) { + runner.cancel(); + } } return Acks.OK; @@ -164,7 +163,7 @@ public class ControlHandlerImpl implements ControlMessageHandler { FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); FragmentExecutor executor; - if(manager != null) { + if (manager != null) { executor = manager.getRunnable(); } else { // then try local cancel. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 1d6a709..5fa9ce0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -54,7 +54,7 @@ public class IncomingBuffers implements AutoCloseable { // Determine the total number of incoming streams that will need to be completed before we are finished. int totalStreams = 0; - for(DataCollector bc : fragCounts.values()){ + for (DataCollector bc : fragCounts.values()) { totalStreams += bc.getTotalIncomingFragments(); } assert totalStreams >= remainingRequired.get() : String.format("Total Streams %d should be more than the minimum number of streams to commence (%d). It isn't.", totalStreams, remainingRequired.get()); @@ -74,14 +74,16 @@ public class IncomingBuffers implements AutoCloseable { } return false; } - if(batch.getHeader().getIsLastBatch()){ + if (batch.getHeader().getIsLastBatch()) { streamsRemaining.decrementAndGet(); } int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId(); DataCollector fSet = fragCounts.get(sendMajorFragmentId); - if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d. %s", sendMajorFragmentId, Arrays.toString(fragCounts.values().toArray()))); + if (fSet == null) { + throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d. %s", sendMajorFragmentId, Arrays.toString(fragCounts.values().toArray()))); + } try { - synchronized(this){ + synchronized (this) { boolean decremented = fSet.batchArrived(batch.getHeader().getSendingMinorFragmentId(), batch); // we should only return true if remaining required has been decremented and is currently equal to zero. @@ -94,11 +96,13 @@ public class IncomingBuffers implements AutoCloseable { public int getRemainingRequired() { int rem = remainingRequired.get(); - if (rem < 0) return 0; + if (rem < 0) { + return 0; + } return rem; } - public RawBatchBuffer[] getBuffers(int senderMajorFragmentId){ + public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) { return fragCounts.get(senderMajorFragmentId).getBuffers(); } @@ -124,7 +128,7 @@ public class IncomingBuffers implements AutoCloseable { @Override public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> value) throws RuntimeException { - for(PhysicalOperator o : op){ + for (PhysicalOperator o : op) { o.accept(this, value); } return null; @@ -132,7 +136,7 @@ public class IncomingBuffers implements AutoCloseable { } - public boolean isDone(){ + public boolean isDone() { return streamsRemaining.get() < 1; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index a4ed4d6..bb43b1e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -66,10 +66,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ return; } buffer.add(batch); - if(buffer.size() == softlimit){ + if (buffer.size() == softlimit) { overlimit.set(true); readController.enqueueResponse(batch.getSender()); - }else{ + } else { batch.sendOk(); } } @@ -93,14 +93,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } RawFragmentBatch batch; while ((batch = buffer.poll()) != null) { - if (batch.getBody() != null) batch.getBody().release(); + if (batch.getBody() != null) { + batch.getBody().release(); + } } } } @Override public void kill(FragmentContext context) { - while(!buffer.isEmpty()){ + while (!buffer.isEmpty()) { RawFragmentBatch batch = buffer.poll(); batch.getBody().release(); } @@ -115,7 +117,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } @Override - public RawFragmentBatch getNext(){ + public RawFragmentBatch getNext() { if (outOfMemory.get() && buffer.size() < 10) { logger.debug("Setting autoread true"); @@ -128,7 +130,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ b = buffer.poll(); // if we didn't get a buffer, block on waiting for buffer. - if(b == null && (!finished || !buffer.isEmpty())){ + if (b == null && (!finished || !buffer.isEmpty())) { try { b = buffer.take(); } catch (InterruptedException e) { @@ -143,8 +145,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ // if we are in the overlimit condition and aren't finished, check if we've passed the start limit. If so, turn off the overlimit condition and set auto read to true (start reading from socket again). - if(!finished && overlimit.get()){ - if(buffer.size() == startlimit){ + if (!finished && overlimit.get()) { + if (buffer.size() == startlimit) { overlimit.set(false); readController.flushResponses(); } @@ -167,5 +169,4 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 05fc2b1..1e5d8b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -108,14 +108,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ this.queryRequest = queryRequest; this.context = new QueryContext(connection.getSession(), queryId, dContext); this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val; - if(queuingEnabled){ + if (queuingEnabled) { int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue(); int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue(); this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue); this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue); this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val; this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val; - }else{ + } else { this.largeSemaphore = null; this.smallSemaphore = null; this.queueThreshold = 0; @@ -138,8 +138,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ return context; } - private boolean isFinished(){ - switch(state.getState()){ + private boolean isFinished() { + switch(state.getState()) { case PENDING: case RUNNING: return false; @@ -150,12 +150,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } private void fail(String message, Throwable t) { - if(isFinished()){ + if(isFinished()) { logger.error("Received a failure message query finished of: {}", message, t); } if (!state.updateState(QueryState.RUNNING, QueryState.FAILED)) { - if (!state.updateState(QueryState.PENDING, QueryState.FAILED)) - logger.warn("Tried to update query state to FAILED, but was not RUNNING"); + if (!state.updateState(QueryState.PENDING, QueryState.FAILED)) { + logger.warn("Tried to update query state to FAILED, but was not RUNNING"); + } } boolean verbose = getContext().getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; @@ -171,7 +172,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } public void cancel() { - if(isFinished()){ + if (isFinished()) { return; } @@ -182,7 +183,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ cleanupAndSendResult(result); } - void cleanupAndSendResult(QueryResult result){ + void cleanupAndSendResult(QueryResult result) { bee.retireForeman(this); initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true); state.updateState(QueryState.RUNNING, QueryState.COMPLETED); @@ -206,7 +207,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman"); fragmentManager.getStatus().setStartTime(System.currentTimeMillis()); // convert a run query request into action - try{ + try { switch (queryRequest.getType()) { case LOGICAL: parseAndRunLogicalPlan(queryRequest.getPlan()); @@ -220,23 +221,23 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ default: throw new UnsupportedOperationException(); } - }catch(AssertionError | Exception ex){ + } catch (AssertionError | Exception ex) { fail("Failure while setting up Foreman.", ex); - }catch(OutOfMemoryError e){ + } catch (OutOfMemoryError e) { System.out.println("Out of memory, exiting."); System.out.flush(); System.exit(-1); - }finally{ + } finally { releaseLease(); Thread.currentThread().setName(originalThread); } } - private void releaseLease(){ - if(lease != null){ - try{ + private void releaseLease() { + if (lease != null) { + try { lease.close(); - }catch(Exception e){ + } catch (Exception e) { logger.warn("Failure while releasing lease.", e); }; } @@ -247,18 +248,22 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ try { LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json); - if(logicalPlan.getProperties().resultMode == ResultMode.LOGICAL){ + if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) { fail("Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.", new Exception()); } - if(logger.isDebugEnabled()) logger.debug("Logical {}", logicalPlan.unparse(context.getConfig())); + if (logger.isDebugEnabled()) { + logger.debug("Logical {}", logicalPlan.unparse(context.getConfig())); + } PhysicalPlan physicalPlan = convert(logicalPlan); - if(logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL){ + if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) { returnPhysical(physicalPlan); return; } - if(logger.isDebugEnabled()) logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan)); + if (logger.isDebugEnabled()) { + logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan)); + } runPhysicalPlan(physicalPlan); } catch (IOException e) { fail("Failure while parsing logical plan.", e); @@ -267,7 +272,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - private void returnPhysical(PhysicalPlan plan){ + private void returnPhysical(PhysicalPlan plan) { String jsonPlan = plan.unparse(context.getConfig().getMapper().writer()); runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan))); } @@ -286,7 +291,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ final SendingAccountor acct; - public SingleListener(){ + public SingleListener() { acct = new SendingAccountor(); acct.increment(); acct.increment(); @@ -304,6 +309,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } + private void parseAndRunPhysicalPlan(String json) { try { PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); @@ -315,10 +321,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ private void runPhysicalPlan(PhysicalPlan plan) { - - - - if(plan.getProperties().resultMode != ResultMode.EXEC){ + if(plan.getProperties().resultMode != ResultMode.EXEC) { fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception()); } PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); @@ -334,7 +337,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ int sortCount = 0; for (PhysicalOperator op : plan.getSortedOperators()) { - if (op instanceof ExternalSort) sortCount++; + if (op instanceof ExternalSort) { + sortCount++; + } } if (sortCount > 0) { @@ -356,13 +361,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ try { double size = 0; - for(PhysicalOperator ops : plan.getSortedOperators()){ + for (PhysicalOperator ops : plan.getSortedOperators()) { size += ops.getCost(); } - if(queuingEnabled){ - if(size > this.queueThreshold){ + if (queuingEnabled) { + if (size > this.queueThreshold) { this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); - }else{ + } else { this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); } } @@ -420,13 +425,15 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); fragmentManager.getStatus().setPlanText(textPlan.value); runPhysicalPlan(plan); - }catch(Exception e){ + } catch(Exception e) { fail("Failure while parsing sql.", e); } } private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException { - if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig())); + if (logger.isDebugEnabled()) { + logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig())); + } return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan); } @@ -443,7 +450,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ public void close() throws IOException { } - public QueryState getQueryState(){ + public QueryState getQueryState() { return this.state.getState(); } @@ -457,7 +464,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ ForemanManagerListener.this.fail(message, t); } - void cleanupAndSendResult(QueryResult result){ + void cleanupAndSendResult(QueryResult result) { Foreman.this.cleanupAndSendResult(result); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java index f89cec9..45a151e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java @@ -59,7 +59,7 @@ public class QueryStatus { private final PStore<QueryProfile> profileCache; - public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, Foreman foreman){ + public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, Foreman foreman) { this.id = id; this.query = query; this.queryId = QueryIdHelper.getQueryId(id); @@ -75,7 +75,7 @@ public class QueryStatus { return fragmentDataSet; } - public void setPlanText(String planText){ + public void setPlanText(String planText) { this.planText = planText; updateCache(); @@ -98,11 +98,11 @@ public class QueryStatus { assert finishedFragments <= totalFragments; } - void add(FragmentData data){ + void add(FragmentData data) { int majorFragmentId = data.getHandle().getMajorFragmentId(); int minorFragmentId = data.getHandle().getMinorFragmentId(); IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId); - if(minorMap == null){ + if (minorMap == null) { minorMap = new IntObjectOpenHashMap<FragmentData>(); fragmentDataMap.put(majorFragmentId, minorMap); } @@ -111,7 +111,7 @@ public class QueryStatus { fragmentDataSet.add(data); } - void update(FragmentStatus status, boolean updateCache){ + void update(FragmentStatus status, boolean updateCache) { int majorFragmentId = status.getHandle().getMajorFragmentId(); int minorFragmentId = status.getHandle().getMinorFragmentId(); fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(status); @@ -120,14 +120,14 @@ public class QueryStatus { } } - public void updateCache(){ + public void updateCache() { QueryState queryState = foreman.getQueryState(); boolean fullStatus = queryState == QueryState.COMPLETED || queryState == QueryState.FAILED; profileCache.put(queryId, getAsProfile(fullStatus)); } @Override - public String toString(){ + public String toString() { return fragmentDataMap.toString(); } @@ -135,12 +135,12 @@ public class QueryStatus { int major; int minor; - public FragmentId(FragmentStatus status){ + public FragmentId(FragmentStatus status) { this.major = status.getHandle().getMajorFragmentId(); this.minor = status.getHandle().getMinorFragmentId(); } - public FragmentId(FragmentData data){ + public FragmentId(FragmentData data) { this.major = data.getHandle().getMajorFragmentId(); this.minor = data.getHandle().getMinorFragmentId(); } @@ -162,42 +162,49 @@ public class QueryStatus { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } FragmentId other = (FragmentId) obj; - if (major != other.major) + if (major != other.major) { return false; - if (minor != other.minor) + } + if (minor != other.minor) { return false; + } return true; } @Override - public String toString(){ + public String toString() { return major + ":" + minor; } } - public QueryProfile getAsProfile(boolean fullStatus){ + public QueryProfile getAsProfile(boolean fullStatus) { QueryProfile.Builder b = QueryProfile.newBuilder(); b.setQuery(query.getPlan()); b.setType(query.getType()); - if(planText != null) b.setPlan(planText); + if (planText != null) { + b.setPlan(planText); + } b.setId(id); if (fullStatus) { - for(int i = 0; i < fragmentDataMap.allocated.length; i++){ - if(fragmentDataMap.allocated[i]){ + for (int i = 0; i < fragmentDataMap.allocated.length; i++) { + if (fragmentDataMap.allocated[i]) { int majorFragmentId = fragmentDataMap.keys[i]; IntObjectOpenHashMap<FragmentData> minorMap = (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i]; MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder(); fb.setMajorFragmentId(majorFragmentId); - for(int v = 0; v < minorMap.allocated.length; v++){ - if(minorMap.allocated[v]){ + for (int v = 0; v < minorMap.allocated.length; v++) { + if (minorMap.allocated[v]) { FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v]; fb.addMinorFragmentProfile(data.getStatus().getProfile()); } @@ -215,4 +222,5 @@ public class QueryStatus { b.setFinishedFragments(finishedFragments); return b.build(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 6b4ee9b..ecc8df2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -49,7 +49,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid private Thread executionThread; private AtomicBoolean closed = new AtomicBoolean(false); - public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){ + public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) { this.context = context; this.bee = bee; this.rootOperator = rootOperator; @@ -75,7 +75,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid root.receivingFragmentFinished(handle); } - public UserClientConnection getClient(){ + public UserClientConnection getClient() { return context.getConnection(); } @@ -102,7 +102,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid // run the query until root.next returns false. while (state.get() == FragmentState.RUNNING_VALUE) { if (!root.next()) { - if (context.isFailed()){ + if (context.isFailed()) { internalFail(context.getFailureCause()); closeOutResources(false); } else { @@ -125,32 +125,38 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } - private void closeOutResources(boolean throwFailure){ - if(closed.get()) return; + private void closeOutResources(boolean throwFailure) { + if (closed.get()) { + return; + } - try{ + try { root.stop(); - }catch(RuntimeException e){ - if(throwFailure) throw e; + } catch (RuntimeException e) { + if (throwFailure) { + throw e; + } logger.warn("Failure while closing out resources.", e); } - try{ + try { context.close(); - }catch(RuntimeException e){ - if(throwFailure) throw e; + } catch (RuntimeException e) { + if (throwFailure) { + throw e; + } logger.warn("Failure while closing out resources.", e); } closed.set(true); } - private void internalFail(Throwable excep){ + private void internalFail(Throwable excep) { state.set(FragmentState.FAILED_VALUE); listener.fail(context.getHandle(), "Failure while running fragment.", excep); } - private void updateState(FragmentState update){ + private void updateState(FragmentState update) { state.set(update.getNumber()); listener.stateChanged(context.getHandle(), update); } @@ -172,7 +178,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid return o.hashCode() - this.hashCode(); } - public FragmentContext getContext(){ + public FragmentContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 02dec0a..9798701 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -48,7 +48,7 @@ public class NonRootFragmentManager implements FragmentManager { private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{ - try{ + try { this.fragment = fragment; DrillbitContext context = bee.getContext(); this.bee = bee; @@ -58,7 +58,7 @@ public class NonRootFragmentManager implements FragmentManager { this.context.setBuffers(buffers); this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman())); - }catch(ExecutionSetupException | IOException e){ + } catch (ExecutionSetupException | IOException e) { throw new FragmentSetupException("Failure while decoding fragment.", e); } } @@ -75,10 +75,14 @@ public class NonRootFragmentManager implements FragmentManager { * @see org.apache.drill.exec.work.fragment.FragmentHandler#getRunnable() */ @Override - public FragmentExecutor getRunnable(){ - synchronized(this){ - if(runner != null) throw new IllegalStateException("Get Runnable can only be run once."); - if(cancel) return null; + public FragmentExecutor getRunnable() { + synchronized(this) { + if (runner != null) { + throw new IllegalStateException("Get Runnable can only be run once."); + } + if (cancel) { + return null; + } runner = new FragmentExecutor(context, bee, root, runnerListener); return this.runner; } @@ -89,10 +93,10 @@ public class NonRootFragmentManager implements FragmentManager { * @see org.apache.drill.exec.work.fragment.FragmentHandler#cancel() */ @Override - public void cancel(){ - synchronized(this){ + public void cancel() { + synchronized(this) { cancel = true; - if(runner != null){ + if (runner != null) { runner.cancel(); } } @@ -125,4 +129,4 @@ public class NonRootFragmentManager implements FragmentManager { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java index 195a2cd..db76057 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java @@ -44,7 +44,7 @@ public class UserWorker{ this.bee = bee; } - public QueryId submitWork(UserClientConnection connection, RunQuery query){ + public QueryId submitWork(UserClientConnection connection, RunQuery query) { UUID uuid = UUID.randomUUID(); QueryId id = QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build(); Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query); @@ -52,31 +52,36 @@ public class UserWorker{ return id; } - public QueryResult getResult(UserClientConnection connection, RequestResults req){ + public QueryResult getResult(UserClientConnection connection, RequestResults req) { Foreman foreman = bee.getForemanForQueryId(req.getQueryId()); - if(foreman == null) return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build(); + if (foreman == null) { + return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build(); + } return foreman.getResult(connection, req); } - public Ack cancelQuery(QueryId query){ + public Ack cancelQuery(QueryId query) { Foreman foreman = bee.getForemanForQueryId(query); - if(foreman != null){ + if(foreman != null) { foreman.cancel(); } return Acks.OK; } - public Ack cancelFragment(FragmentHandle handle){ + public Ack cancelFragment(FragmentHandle handle) { FragmentExecutor runner = bee.getFragmentRunner(handle); - if(runner != null) runner.cancel(); + if (runner != null) { + runner.cancel(); + } return Acks.OK; } - public SchemaFactory getSchemaFactory(){ + public SchemaFactory getSchemaFactory() { return bee.getContext().getSchemaFactory(); } - public OptionManager getSystemOptions(){ + public OptionManager getSystemOptions() { return bee.getContext().getOptionManager(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index f99c2fa..44ef032 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This file tells Drill to consider this module when class path scanning. -// This file can also include any supplementary configuration information. +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. // This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl" @@ -45,7 +45,7 @@ drill.exec: { threads: 10 } }, - use.ip : false + use.ip : false }, operator: { packages += "org.apache.drill.exec.physical.config" @@ -75,14 +75,14 @@ drill.exec: { } }, zk: { - connect: "localhost:2181", - root: "drill", - refresh: 500, - timeout: 5000, - retry: { - count: 7200, - delay: 500 - } + connect: "localhost:2181", + root: "drill", + refresh: 500, + timeout: 5000, + retry: { + count: 7200, + delay: 500 + } }, http: { enabled: true, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/resources/rest/profile/profile.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl index 3cd214c..8ca21fa 100644 --- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl +++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl @@ -75,10 +75,10 @@ </form> </div> </div> - + <div class="page-header"></div> <h3>Fragment Profiles</h3> - + <div class="panel-group" id="fragment-accordion"> <div class="panel panel-default"> <div class="panel-heading"> @@ -112,10 +112,10 @@ </div> </#list> </div> - + <div class="page-header"></div> <h3>Operator Profiles</h3> - + <div class="panel-group" id="operator-accordion"> <div class="panel panel-default"> <div class="panel-heading"> @@ -149,10 +149,10 @@ </div> </#list> </div> - + <div class="page-header"></div> <h3>Full JSON Profile</h3> - + <div class="span4 collapse-group" id="full-json-profile"> <a class="btn btn-default" data-toggle="collapse" data-target="#full-json-profile-json">JSON profile</a> <br> <br> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/resources/rest/www/graph.js ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/rest/www/graph.js b/exec/java-exec/src/main/resources/rest/www/graph.js index b65c416..55ca7f4 100644 --- a/exec/java-exec/src/main/resources/rest/www/graph.js +++ b/exec/java-exec/src/main/resources/rest/www/graph.js @@ -71,7 +71,7 @@ $(window).load(function () { return r1.end - r1.start > r2.end - r2.start ? 1 : -1; } else return r1.category > r2.category ? 1 : -1; - + }); return timetable; } @@ -106,7 +106,7 @@ $(window).load(function () { fragment: parseInt(ps[i][0].split("-")[0]) }); } - + // edges var st = [ps[0]]; for (var i = 1; i < ps.length; i++) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index f504de4..99c4da5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -95,9 +95,9 @@ public class BaseTestQuery extends ExecTest{ public static void openClient() throws Exception{ config = DrillConfig.create(TEST_CONFIGURATIONS); allocator = new TopLevelAllocator(config); - if(config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)){ + if (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)) { serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator); - }else{ + } else { serviceSet = RemoteServiceSet.getLocalServiceSet(); } bit = new Drillbit(config, serviceSet); @@ -105,23 +105,31 @@ public class BaseTestQuery extends ExecTest{ client = new DrillClient(config, serviceSet.getCoordinator()); client.connect(); List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY)); - for(QueryResultBatch b : results){ + for (QueryResultBatch b : results) { b.release(); } } - protected BufferAllocator getAllocator(){ + protected BufferAllocator getAllocator() { return allocator; } @AfterClass public static void closeClient() throws IOException{ - if(client != null) client.close(); - if(bit != null) bit.close(); - if(serviceSet != null) serviceSet.close(); - if(allocator != null) allocator.close(); + if (client != null) { + client.close(); + } + if (bit != null) { + bit.close(); + } + if(serviceSet != null) { + serviceSet.close(); + } + if (allocator != null) { + allocator.close(); + } } protected void runSQL(String sql) throws Exception { @@ -154,7 +162,7 @@ public class BaseTestQuery extends ExecTest{ return resultListener.await(); } - protected void testWithListener(QueryType type, String query, UserResultsListener resultListener){ + protected void testWithListener(QueryType type, String query, UserResultsListener resultListener) { query = query.replace("[WORKING_PATH]", TestTools.getWorkingPath()); client.runQuery(type, query, resultListener); } @@ -176,8 +184,10 @@ public class BaseTestQuery extends ExecTest{ protected void test(String query) throws Exception{ String[] queries = query.split(";"); - for(String q : queries){ - if(q.trim().isEmpty()) continue; + for (String q : queries) { + if (q.trim().isEmpty()) { + continue; + } testRunAndPrint(QueryType.SQL, q); } } @@ -197,19 +207,22 @@ public class BaseTestQuery extends ExecTest{ protected void testPhysicalFromFile(String file) throws Exception{ testPhysical(getFile(file)); } + protected List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception { return testRunAndReturn(QueryType.PHYSICAL, getFile(file)); } + protected void testLogicalFromFile(String file) throws Exception{ testLogical(getFile(file)); } + protected void testSqlFromFile(String file) throws Exception{ test(getFile(file)); } protected String getFile(String resource) throws IOException{ URL url = Resources.getResource(resource); - if(url == null){ + if (url == null) { throw new IOException(String.format("Unable to find path %s.", resource)); } return Resources.toString(url, Charsets.UTF_8); @@ -245,7 +258,9 @@ public class BaseTestQuery extends ExecTest{ public int waitForCompletion() throws Exception { latch.await(); - if(exception != null) throw exception; + if (exception != null) { + throw exception; + } return count.get(); } } @@ -261,7 +276,7 @@ public class BaseTestQuery extends ExecTest{ protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException { int rowCount = 0; RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - for(QueryResultBatch result : results){ + for(QueryResultBatch result : results) { rowCount += result.getHeader().getRowCount(); loader.load(result.getHeader().getDef(), result.getData()); if (loader.getRecordCount() <= 0) { @@ -279,7 +294,7 @@ public class BaseTestQuery extends ExecTest{ StringBuilder formattedResults = new StringBuilder(); boolean includeHeader = true; RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - for(QueryResultBatch result : results){ + for(QueryResultBatch result : results) { loader.load(result.getHeader().getDef(), result.getData()); if (loader.getRecordCount() <= 0) { break; @@ -294,4 +309,5 @@ public class BaseTestQuery extends ExecTest{ return formattedResults.toString(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index 6331116..0c75640 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -101,7 +101,6 @@ public class PlanTestBase extends BaseTestQuery { for (String substr : expectedSubstrs) { assertTrue(planStr.contains(substr)); } - } /** @@ -120,7 +119,6 @@ public class PlanTestBase extends BaseTestQuery { } } - /** * This method will take a SQL string statement, get the PHYSICAL plan in * Optiq RelNode format. Then check the physical plan against the list @@ -149,7 +147,6 @@ public class PlanTestBase extends BaseTestQuery { } } - /* * This will get the plan (either logical or physical) in Optiq RelNode * format, based on SqlExplainLevel and Depth. @@ -205,8 +202,9 @@ public class PlanTestBase extends BaseTestQuery { StringBuilder builder = new StringBuilder(); for (QueryResultBatch b : results) { - if (!b.hasData()) + if (!b.hasData()) { continue; + } loader.load(b.getHeader().getDef(), b.getData()); @@ -247,8 +245,9 @@ public class PlanTestBase extends BaseTestQuery { Stack<Integer> s = new Stack<Integer>(); for (String line : planLines) { - if (line.trim().isEmpty()) + if (line.trim().isEmpty()) { continue; + } if (line.contains(joinKeyWord)) { builder.append(Strings.repeat(" ", 2 * s.size())); builder.append(joinKeyWord + "\n"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index aca7d06..7fc7d6b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -63,11 +63,11 @@ public class PlanningBase extends ExecTest{ private final DrillConfig config = DrillConfig.create(); - protected void testSqlPlanFromFile(String file) throws Exception{ + protected void testSqlPlanFromFile(String file) throws Exception { testSqlPlan(getFile(file)); } - protected void testSqlPlan(String sqlCommands) throws Exception{ + protected void testSqlPlan(String sqlCommands) throws Exception { String[] sqlStrings = sqlCommands.split(";"); final DistributedCache cache = new LocalCache(); @@ -133,8 +133,10 @@ public class PlanningBase extends ExecTest{ } }; - for(String sql : sqlStrings){ - if(sql.trim().isEmpty()) continue; + for (String sql : sqlStrings) { + if (sql.trim().isEmpty()) { + continue; + } DrillSqlWorker worker = new DrillSqlWorker(context); PhysicalPlan p = worker.getPlan(sql); } @@ -143,7 +145,7 @@ public class PlanningBase extends ExecTest{ protected String getFile(String resource) throws IOException{ URL url = Resources.getResource(resource); - if(url == null){ + if (url == null) { throw new IOException(String.format("Unable to find path %s.", resource)); } return Resources.toString(url, Charsets.UTF_8); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java index e45b248..828ffe9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java @@ -136,8 +136,9 @@ public class TestProjectPushDown extends PlanTestBase { String query = getFile(fileName); String[] queries = query.split(";"); for (String q : queries) { - if (q.trim().isEmpty()) + if (q.trim().isEmpty()) { continue; + } testPhysicalPlan(q, expectedSubstrs); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java index 29502be..05105fc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -55,7 +55,7 @@ public class TestOpSerialization { screen.setOperatorId(0); boolean reversed = false; - while(true){ + while (true) { List<PhysicalOperator> pops = Lists.newArrayList(); pops.add(s); @@ -63,7 +63,9 @@ public class TestOpSerialization { pops.add(f); pops.add(screen); - if(reversed) pops = Lists.reverse(pops); + if (reversed) { + pops = Lists.reverse(pops); + } PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), pops); String json = plan1.unparse(c.getMapper().writer()); System.out.println(json); @@ -78,12 +80,12 @@ public class TestOpSerialization { assertEquals(1, o1.getOperatorId()); PhysicalOperator o2 = o1.iterator().next(); assertEquals(2, o2.getOperatorId()); - if(reversed) break; + if(reversed) { + break; + } reversed = !reversed; } - - - } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java index 82da483..09248da 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java @@ -84,7 +84,9 @@ public class TestWriteToDisk extends ExecTest{ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("/tmp/drillSerializable"); - if (fs.exists(path)) fs.delete(path, false); + if (fs.exists(path)) { + fs.delete(path, false); + } FSDataOutputStream out = fs.create(path); wrap.writeToStream(out); @@ -109,4 +111,5 @@ public class TestWriteToDisk extends ExecTest{ } } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java index 5e57dc7..6a3d2f1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java @@ -39,51 +39,52 @@ import com.google.common.io.Files; public class TestAggregateFunction extends PopUnitTestBase { - public void runTest(Object[] values, String planPath, String dataPath) throws Throwable { - - try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - Drillbit bit = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { - - // run query. - bit.run(); - client.connect(); - List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, - Files.toString(FileUtils.getResourceAsFile(planPath), Charsets.UTF_8).replace("#{TEST_FILE}", dataPath)); - - RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); - - QueryResultBatch batch = results.get(0); - assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - - int i = 0; - for (VectorWrapper<?> v : batchLoader) { - ValueVector.Accessor accessor = v.getValueVector().getAccessor(); - assertEquals(values[i++], (accessor.getObject(0))); - } - - batchLoader.clear(); - for(QueryResultBatch b : results){ - b.release(); - } - } - } + public void runTest(Object[] values, String planPath, String dataPath) throws Throwable { + + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile(planPath), Charsets.UTF_8).replace("#{TEST_FILE}", dataPath)); + + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); - @Test - public void testSortDate() throws Throwable { - String planPath = "/functions/test_stddev_variance.json"; - String dataPath = "/simple_stddev_variance_input.json"; - Double expectedValues[] = {2.0d, 2.138089935299395d, 2.138089935299395d, 4.0d, 4.571428571428571d, 4.571428571428571d}; + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - runTest(expectedValues, planPath, dataPath); + int i = 0; + for (VectorWrapper<?> v : batchLoader) { + ValueVector.Accessor accessor = v.getValueVector().getAccessor(); + assertEquals(values[i++], (accessor.getObject(0))); + } + + batchLoader.clear(); + for(QueryResultBatch b : results) { + b.release(); + } } + } + + @Test + public void testSortDate() throws Throwable { + String planPath = "/functions/test_stddev_variance.json"; + String dataPath = "/simple_stddev_variance_input.json"; + Double expectedValues[] = {2.0d, 2.138089935299395d, 2.138089935299395d, 4.0d, 4.571428571428571d, 4.571428571428571d}; + + runTest(expectedValues, planPath, dataPath); + } + + @Test + public void testCovarianceCorrelation() throws Throwable { + String planPath = "/functions/test_covariance.json"; + String dataPath = "/covariance_input.json"; + Double expectedValues[] = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d , 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d}; - @Test - public void testCovarianceCorrelation() throws Throwable { - String planPath = "/functions/test_covariance.json"; - String dataPath = "/covariance_input.json"; - Double expectedValues[] = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d , 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d}; + runTest(expectedValues, planPath, dataPath); + } - runTest(expectedValues, planPath, dataPath); } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewMathFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewMathFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewMathFunctions.java index 28e667e..54c1700 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewMathFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestNewMathFunctions.java @@ -51,86 +51,93 @@ import com.google.common.io.Resources; public class TestNewMathFunctions { - - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNewMathFunctions.class); - - DrillConfig c = DrillConfig.create(); - PhysicalPlanReader reader; - FunctionImplementationRegistry registry; - FragmentContext context; - - public Object[] getRunResult(SimpleRootExec exec) { - int size = 0; - for (ValueVector v : exec) { - size++; - } - - Object[] res = new Object [size]; - int i = 0; - for (ValueVector v : exec) { - if (v instanceof VarCharVector) { - res[i++] = new String( ((VarCharVector) v).getAccessor().get(0)); - } else - res[i++] = v.getAccessor().getObject(0); - } - return res; - } - - public void runTest(@Injectable final DrillbitContext bitContext, - @Injectable UserServer.UserClientConnection connection, Object[] expectedResults, String planPath) throws Throwable { - - new NonStrictExpectations(){{ - bitContext.getMetrics(); result = new MetricRegistry(); - bitContext.getAllocator(); result = new TopLevelAllocator(); - bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); - bitContext.getConfig(); result = c; - bitContext.getCompiler(); result = CodeCompiler.getTestCompiler(c); - }}; - - String planString = Resources.toString(Resources.getResource(planPath), Charsets.UTF_8); - if(reader == null) reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); - if(registry == null) registry = new FunctionImplementationRegistry(c); - if(context == null) context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); //new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry); - PhysicalPlan plan = reader.readPhysicalPlan(planString); - SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); - - while(exec.next()){ - Object [] res = getRunResult(exec); - assertEquals("return count does not match", res.length, expectedResults.length); - - for (int i = 0; i<res.length; i++) { - assertEquals(String.format("column %s does not match", i), res[i], expectedResults[i]); - } - } - - if(context.getFailureCause() != null){ - throw context.getFailureCause(); - } - - assertTrue(!context.isFailed()); - } - - @Test - public void testTrigoMathFunc(@Injectable final DrillbitContext bitContext, - @Injectable UserServer.UserClientConnection connection) throws Throwable{ - Object [] expected = new Object[] {Math.sin(45), Math.cos(45), Math.tan(45),Math.asin(45), Math.acos(45), Math.atan(45),Math.sinh(45), Math.cosh(45), Math.tanh(45)}; - runTest(bitContext, connection, expected, "functions/testTrigoMathFunctions.json"); - } - - @Test - public void testExtendedMathFunc(@Injectable final DrillbitContext bitContext, - @Injectable UserServer.UserClientConnection connection) throws Throwable{ - BigDecimal d = new BigDecimal("100111111111111111111111111111111111.00000000000000000000000000000000000000000000000000001"); - - Object [] expected = new Object[] {Math.cbrt(1000), Math.log(10), (Math.log(64.0)/Math.log(2.0)), Math.exp(10), Math.toDegrees(0.5), Math.toRadians(45.0), Math.PI, Math.cbrt(d.doubleValue()), Math.log(d.doubleValue()), (Math.log(d.doubleValue())/Math.log(2)), Math.exp(d.doubleValue()), Math.toDegrees(d.doubleValue()), Math.toRadians(d.doubleValue())}; - - runTest(bitContext, connection, expected, "functions/testExtendedMathFunctions.json"); - } - - @Test - public void testTruncDivMod(@Injectable final DrillbitContext bitContext, - @Injectable UserServer.UserClientConnection connection) throws Throwable{ - Object [] expected = new Object[] {101.0, 0, 101, 1010.0, 101, 481.0, 0.001099999999931267}; - runTest(bitContext, connection, expected, "functions/testDivModTruncFunctions.json"); - } -} \ No newline at end of file + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNewMathFunctions.class); + + DrillConfig c = DrillConfig.create(); + PhysicalPlanReader reader; + FunctionImplementationRegistry registry; + FragmentContext context; + + public Object[] getRunResult(SimpleRootExec exec) { + int size = 0; + for (ValueVector v : exec) { + size++; + } + + Object[] res = new Object [size]; + int i = 0; + for (ValueVector v : exec) { + if (v instanceof VarCharVector) { + res[i++] = new String( ((VarCharVector) v).getAccessor().get(0)); + } else { + res[i++] = v.getAccessor().getObject(0); + } + } + return res; + } + + public void runTest(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection, Object[] expectedResults, String planPath) throws Throwable { + + new NonStrictExpectations() {{ + bitContext.getMetrics(); result = new MetricRegistry(); + bitContext.getAllocator(); result = new TopLevelAllocator(); + bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); + bitContext.getConfig(); result = c; + bitContext.getCompiler(); result = CodeCompiler.getTestCompiler(c); + }}; + + String planString = Resources.toString(Resources.getResource(planPath), Charsets.UTF_8); + if (reader == null) { + reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + } + if (registry == null) { + registry = new FunctionImplementationRegistry(c); + } + if (context == null) { + context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); //new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, registry); + } + PhysicalPlan plan = reader.readPhysicalPlan(planString); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + + while (exec.next()) { + Object [] res = getRunResult(exec); + assertEquals("return count does not match", res.length, expectedResults.length); + + for (int i = 0; i<res.length; i++) { + assertEquals(String.format("column %s does not match", i), res[i], expectedResults[i]); + } + } + + if (context.getFailureCause() != null) { + throw context.getFailureCause(); + } + + assertTrue(!context.isFailed()); + } + + @Test + public void testTrigoMathFunc(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable{ + Object [] expected = new Object[] {Math.sin(45), Math.cos(45), Math.tan(45),Math.asin(45), Math.acos(45), Math.atan(45),Math.sinh(45), Math.cosh(45), Math.tanh(45)}; + runTest(bitContext, connection, expected, "functions/testTrigoMathFunctions.json"); + } + + @Test + public void testExtendedMathFunc(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable{ + BigDecimal d = new BigDecimal("100111111111111111111111111111111111.00000000000000000000000000000000000000000000000000001"); + + Object [] expected = new Object[] {Math.cbrt(1000), Math.log(10), (Math.log(64.0)/Math.log(2.0)), Math.exp(10), Math.toDegrees(0.5), Math.toRadians(45.0), Math.PI, Math.cbrt(d.doubleValue()), Math.log(d.doubleValue()), (Math.log(d.doubleValue())/Math.log(2)), Math.exp(d.doubleValue()), Math.toDegrees(d.doubleValue()), Math.toRadians(d.doubleValue())}; + + runTest(bitContext, connection, expected, "functions/testExtendedMathFunctions.json"); + } + + @Test + public void testTruncDivMod(@Injectable final DrillbitContext bitContext, + @Injectable UserServer.UserClientConnection connection) throws Throwable{ + Object [] expected = new Object[] {101.0, 0, 101, 1010.0, 101, 481.0, 0.001099999999931267}; + runTest(bitContext, connection, expected, "functions/testDivModTruncFunctions.json"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index a15b503..f878bcb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -39,37 +39,37 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ private RecordBatch incoming; private ScreenRoot screenRoot; - public SimpleRootExec(RootExec e){ - if(e instanceof ScreenRoot){ + public SimpleRootExec(RootExec e) { + if (e instanceof ScreenRoot) { incoming = ((ScreenRoot)e).getIncoming(); screenRoot = (ScreenRoot) e; - }else{ + } else { throw new UnsupportedOperationException(); } } - public FragmentContext getContext(){ + public FragmentContext getContext() { return incoming.getContext(); } - public SelectionVector2 getSelectionVector2(){ + public SelectionVector2 getSelectionVector2() { return incoming.getSelectionVector2(); } - public SelectionVector4 getSelectionVector4(){ + public SelectionVector4 getSelectionVector4() { return incoming.getSelectionVector4(); } @SuppressWarnings("unchecked") - public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){ + public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass) { TypedFieldId tfid = incoming.getValueVectorId(path); return (T) incoming.getValueAccessorById(vvClass, tfid.getFieldIds()).getValueVector(); } @Override public boolean next() { - switch(incoming.next()){ + switch (incoming.next()) { case NONE: case STOP: incoming.cleanup(); @@ -92,19 +92,19 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ @Override public Iterator<ValueVector> iterator() { List<ValueVector> vv = Lists.newArrayList(); - for(VectorWrapper<?> vw : incoming){ + for (VectorWrapper<?> vw : incoming) { vv.add(vw.getValueVector()); } return vv.iterator(); } - public int getRecordCount(){ + public int getRecordCount() { return incoming.getRecordCount(); } /// Temporary: for exposing the incoming batch to TestHashTable public RecordBatch getIncoming() { - return incoming; + return incoming; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java index 826ebf5..5212125 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java @@ -38,7 +38,7 @@ public class TestBroadcastExchange extends PopUnitTestBase { public void TestSingleBroadcastExchangeWithTwoScans() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { @@ -52,8 +52,10 @@ public class TestBroadcastExchange extends PopUnitTestBase { .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()); List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, physicalPlan); int count = 0; - for(QueryResultBatch b : results) { - if (b.getHeader().getRowCount() != 0) count += b.getHeader().getRowCount(); + for (QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) { + count += b.getHeader().getRowCount(); + } b.release(); } assertEquals(25, count); @@ -64,7 +66,7 @@ public class TestBroadcastExchange extends PopUnitTestBase { public void TestMultipleSendLocationBroadcastExchange() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { @@ -76,11 +78,14 @@ public class TestBroadcastExchange extends PopUnitTestBase { FileUtils.getResourceAsFile("/sender/broadcast_exchange_long_run.json"), Charsets.UTF_8); List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, physicalPlan); int count = 0; - for(QueryResultBatch b : results) { - if (b.getHeader().getRowCount() != 0) count += b.getHeader().getRowCount(); + for (QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) { + count += b.getHeader().getRowCount(); + } b.release(); } System.out.println(count); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java index 421c3f5..609bc14 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java @@ -53,7 +53,7 @@ public class TestComparisonFunctions extends ExecTest { public void runTest(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection, String expression, int expectedResults) throws Throwable { - new NonStrictExpectations(){{ + new NonStrictExpectations() {{ bitContext.getMetrics(); result = new MetricRegistry(); bitContext.getAllocator(); result = new TopLevelAllocator(); bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c); @@ -62,13 +62,19 @@ public class TestComparisonFunctions extends ExecTest { }}; String planString = Resources.toString(Resources.getResource(COMPARISON_TEST_PHYSICAL_PLAN), Charsets.UTF_8).replaceAll("EXPRESSION", expression); - if(reader == null) reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); - if(registry == null) registry = new FunctionImplementationRegistry(c); - if(context == null) context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); + if (reader == null) { + reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + } + if (registry == null) { + registry = new FunctionImplementationRegistry(c); + } + if(context == null) { + context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); + } PhysicalPlan plan = reader.readPhysicalPlan(planString); SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); - while(exec.next()){ + while(exec.next()) { assertEquals(String.format("Expression: %s;", expression), expectedResults, exec.getSelectionVector2().getCount()); // for (ValueVector vv: exec) { // vv.close(); @@ -79,8 +85,7 @@ public class TestComparisonFunctions extends ExecTest { context.close(); - - if(context.getFailureCause() != null){ + if (context.getFailureCause() != null) { throw context.getFailureCause(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java index 5111a49..c64c7a3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java @@ -306,7 +306,7 @@ public class TestConvertFunctions extends BaseTestQuery { List<QueryResultBatch> results = testLogicalWithResults(logicalPlan); int count = 0; RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); - for(QueryResultBatch result : results){ + for (QueryResultBatch result : results){ count += result.getHeader().getRowCount(); loader.load(result.getHeader().getDef(), result.getData()); if (loader.getRecordCount() > 0) { @@ -365,7 +365,9 @@ public class TestConvertFunctions extends BaseTestQuery { protected <T> void verifyPhysicalPlan(String expression, T expectedResults) throws Throwable { expression = expression.replace("\\", "\\\\\\\\"); // "\\\\\\\\" => Java => "\\\\" => JsonParser => "\\" => AntlrParser "\" - if (textFileContent == null) textFileContent = Resources.toString(Resources.getResource(CONVERSION_TEST_PHYSICAL_PLAN), Charsets.UTF_8); + if (textFileContent == null) { + textFileContent = Resources.toString(Resources.getResource(CONVERSION_TEST_PHYSICAL_PLAN), Charsets.UTF_8); + } String planString = textFileContent.replace("__CONVERT_EXPRESSION__", expression); verifyResults(expression, expectedResults, getRunResult(QueryType.PHYSICAL, planString)); @@ -432,4 +434,5 @@ public class TestConvertFunctions extends BaseTestQuery { expected.getClass().getName(), (actual == null ? "null" : actual.getClass().getName()))); } } + }
