more diag fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/79054a85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/79054a85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/79054a85 Branch: refs/heads/diagnostics2 Commit: 79054a85a979e0d2640855edb0e7fd96b69397fb Parents: f4c37bf Author: Jacques Nadeau <[email protected]> Authored: Wed May 21 08:38:11 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 21 13:32:00 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../drill/exec/cache/CachedVectorContainer.java | 24 +++- .../drill/exec/cache/local/LocalCache.java | 7 +- .../apache/drill/exec/client/DrillClient.java | 2 +- .../drill/exec/expr/EvaluationVisitor.java | 3 +- .../exec/physical/impl/join/MergeJoinBatch.java | 132 ++++++++++--------- .../impl/project/ProjectRecordBatch.java | 7 +- .../exec/planner/common/DrillWriterRelBase.java | 15 +++ .../exec/planner/logical/DrillWriterRel.java | 1 + .../drill/exec/planner/physical/WriterPrel.java | 7 +- .../physical/visitor/FinalColumnReorderer.java | 5 + .../drill/exec/planner/sql/DrillSqlWorker.java | 2 +- .../org/apache/drill/exec/rpc/BasicServer.java | 5 +- .../drill/exec/rpc/control/ControllerImpl.java | 6 +- .../exec/rpc/data/DataConnectionCreator.java | 6 +- .../org/apache/drill/exec/server/Drillbit.java | 17 ++- .../drill/exec/service/ServiceEngine.java | 18 +-- .../drill/exec/store/sys/SystemTableScan.java | 6 + .../src/main/resources/drill-module.conf | 4 + .../apache/drill/exec/server/TestBitRpc.java | 2 +- .../exec/store/json/JsonRecordReader2Test.java | 4 +- exec/java-exec/src/test/resources/logback.xml | 2 +- pom.xml | 2 +- 23 files changed, 179 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 238fae9..d9e0833 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -71,5 +71,8 @@ public interface ExecConstants { public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); public static final String PARQUET_BLOCK_SIZE = "parquet.block.size"; public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024); + public static final String HTTP_ENABLE = "drill.exec.http.enabled"; + public static final String HTTP_PORT = "drill.exec.http.port"; + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java index 1447e28..da0b186 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.cache; import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import org.apache.commons.io.output.ByteArrayOutputStream; @@ -29,7 +31,7 @@ import org.apache.drill.exec.record.WritableBatch; public class CachedVectorContainer extends LoopedAbstractDrillSerializable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachedVectorContainer.class); - private final byte[] data; + private byte[] data; private final BufferAllocator allocator; private VectorContainer container; @@ -42,6 +44,10 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable { va.clear(); } + public CachedVectorContainer(BufferAllocator allocator) { + this.allocator = allocator; + } + public CachedVectorContainer(byte[] data, BufferAllocator allocator) { this.data = data; this.allocator = allocator; @@ -58,6 +64,20 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable { } + + @Override + public void read(DataInput input) throws IOException { + int len = input.readInt(); + this.data = new byte[len]; + input.readFully(data); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(data.length); + output.write(data); + } + public VectorAccessible get() { if (container == null) { construct(); @@ -66,7 +86,7 @@ public class CachedVectorContainer extends LoopedAbstractDrillSerializable { } public void clear() { - container.clear(); + if(container != null) container.clear(); container = null; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java index 942e09e..1b44c6b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.cache.local; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -31,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.util.DataInputInputStream; import org.apache.drill.common.util.DataOutputOutputStream; import org.apache.drill.exec.cache.Counter; import org.apache.drill.exec.cache.DistributedCache; @@ -171,8 +171,7 @@ public class LocalCache implements DistributedCache { } } - ByteArrayDataInput in = ByteStreams.newDataInput(bytes); - InputStream inputStream = DataInputInputStream.constructInputStream(in); + InputStream inputStream = new ByteArrayInputStream(bytes); try { V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator); obj.readFromStream(inputStream); @@ -220,7 +219,7 @@ public class LocalCache implements DistributedCache { if (m.get(key) == null) return null; ByteArrayDataOutput b = m.get(key); byte[] bytes = b.toByteArray(); - return (V) deserialize(m.get(key).toByteArray(), this.clazz); + return (V) deserialize(bytes, this.clazz); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 3b87dc4..92097e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -188,7 +188,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ * Closes this client's connection to the server */ public void close(){ - this.client.close(); + if(this.client != null) this.client.close(); if(ownsZkConnection){ try { this.clusterCoordinator.close(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index 731ab6b..ba846b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -383,7 +383,8 @@ public class EvaluationVisitor { PathSegment seg = e.getReadPath(); int listNum = 0; boolean lastWasArray = false; - while(true){ + + while(seg != null){ if(seg.isArray()){ lastWasArray = true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 46dea64..121cfec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -130,77 +130,83 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { @Override public IterOutcome next() { + stats.startProcessing(); + + try{ + // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. + status.ensureInitial(); + + // loop so we can start over again if we find a new batch was created. + while(true){ + + JoinOutcome outcome = status.getOutcome(); + // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. + if (outcome == JoinOutcome.BATCH_RETURNED || + outcome == JoinOutcome.SCHEMA_CHANGED) + allocateBatch(); + + // reset the output position to zero after our parent iterates this RecordBatch + if (outcome == JoinOutcome.BATCH_RETURNED || + outcome == JoinOutcome.SCHEMA_CHANGED || + outcome == JoinOutcome.NO_MORE_DATA) + status.resetOutputPos(); + + if (outcome == JoinOutcome.NO_MORE_DATA) { + logger.debug("NO MORE DATA; returning {} NONE"); + return IterOutcome.NONE; + } - // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. - status.ensureInitial(); - - // loop so we can start over again if we find a new batch was created. - while(true){ - - JoinOutcome outcome = status.getOutcome(); - // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED) - allocateBatch(); - - // reset the output position to zero after our parent iterates this RecordBatch - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED || - outcome == JoinOutcome.NO_MORE_DATA) - status.resetOutputPos(); + boolean first = false; + if(worker == null){ + try { + logger.debug("Creating New Worker"); + stats.startSetup(); + this.worker = generateNewWorker(); + first = true; + stats.stopSetup(); + } catch (ClassTransformationException | IOException | SchemaChangeException e) { + stats.stopSetup(); + context.fail(new SchemaChangeException(e)); + kill(); + return IterOutcome.STOP; + } + } - if (outcome == JoinOutcome.NO_MORE_DATA) { - logger.debug("NO MORE DATA; returning {} NONE"); - return IterOutcome.NONE; - } + // join until we have a complete outgoing batch + if (!worker.doJoin(status)) + worker = null; - boolean first = false; - if(worker == null){ - try { - logger.debug("Creating New Worker"); - stats.startSetup(); - this.worker = generateNewWorker(); - first = true; - stats.stopSetup(); - } catch (ClassTransformationException | IOException | SchemaChangeException e) { - stats.stopSetup(); - context.fail(new SchemaChangeException(e)); + // get the outcome of the join. + switch(status.getOutcome()){ + case BATCH_RETURNED: + // only return new schema if new worker has been setup. + logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + case FAILURE: kill(); return IterOutcome.STOP; + case NO_MORE_DATA: + logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE")); + return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE; + case SCHEMA_CHANGED: + worker = null; + if(status.getOutPosition() > 0){ + // if we have current data, let's return that. + logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + }else{ + // loop again to rebuild worker. + continue; + } + case WAITING: + return IterOutcome.NOT_YET; + default: + throw new IllegalStateException(); } } - // join until we have a complete outgoing batch - if (!worker.doJoin(status)) - worker = null; - - // get the outcome of the join. - switch(status.getOutcome()){ - case BATCH_RETURNED: - // only return new schema if new worker has been setup. - logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); - return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - case FAILURE: - kill(); - return IterOutcome.STOP; - case NO_MORE_DATA: - logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE")); - return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE; - case SCHEMA_CHANGED: - worker = null; - if(status.getOutPosition() > 0){ - // if we have current data, let's return that. - logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); - return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - }else{ - // loop again to rebuild worker. - continue; - } - case WAITING: - return IterOutcome.NOT_YET; - default: - throw new IllegalStateException(); - } + }finally{ + stats.stopProcessing(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index fe19797..96d3242 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -180,10 +180,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ boolean isAnyWildcard = isAnyWildcard(exprs); if(isAnyWildcard){ + + // add this until we have sv2 project on wildcard working correctly. + if(incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE){ + throw new UnsupportedOperationException("Drill doesn't yet wildcard projects where there is a sv2, patch coming shortly."); + } for(VectorWrapper<?> wrapper : incoming){ ValueVector vvIn = wrapper.getValueVector(); - String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath(); + String name = vvIn.getField().getPath().getRootSegment().getPath(); FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java index 357cb2e..03431d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWriterRelBase.java @@ -17,18 +17,33 @@ */ package org.apache.drill.exec.planner.common; +import java.util.List; + import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.eigenbase.rel.RelNode; import org.eigenbase.rel.SingleRel; import org.eigenbase.relopt.Convention; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.sql.type.SqlTypeName; + +import com.google.common.collect.ImmutableList; +import com.google.hive12.common.collect.Lists; /** Base class for logical and physical Writer implemented in Drill. */ public abstract class DrillWriterRelBase extends SingleRel implements DrillRelNode { + private static final List<String> FIELD_NAMES = ImmutableList.of("Fragment", "Number of records written"); private final CreateTableEntry createTableEntry; + protected void setRowType(){ + List<RelDataType> fields = Lists.newArrayList(); + fields.add(this.getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR, 255)); + fields.add(this.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + this.rowType = this.getCluster().getTypeFactory().createStructType(fields, FIELD_NAMES); + } + public DrillWriterRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traitSet, RelNode input, CreateTableEntry createTableEntry) { super(cluster, traitSet, input); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java index f212026..04dd133 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java @@ -30,6 +30,7 @@ public class DrillWriterRel extends DrillWriterRelBase implements DrillRel { public DrillWriterRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, CreateTableEntry createTableEntry) { super(DRILL_LOGICAL, cluster, traitSet, input, createTableEntry); + setRowType(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java index a7f611c..233b20b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java @@ -37,14 +37,11 @@ import com.google.hive12.common.collect.Lists; public class WriterPrel extends DrillWriterRelBase implements Prel { - private static final List<String> FIELD_NAMES = ImmutableList.of("Fragment", "Number of records written"); + public WriterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, CreateTableEntry createTableEntry) { super(Prel.DRILL_PHYSICAL, cluster, traits, child, createTableEntry); - List<RelDataType> fields = Lists.newArrayList(); - fields.add(cluster.getTypeFactory().createSqlType(SqlTypeName.VARCHAR, 255)); - fields.add(cluster.getTypeFactory().createSqlType(SqlTypeName.BIGINT)); - this.rowType = cluster.getTypeFactory().createStructType(fields, FIELD_NAMES); + setRowType(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java index 6ed3c1f..4ea82cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FinalColumnReorderer.java @@ -48,9 +48,14 @@ public class FinalColumnReorderer extends BasePrelVisitor<Prel, Void, RuntimeExc private Prel addTrivialOrderedProjectPrel(Prel prel){ RelDataType t = prel.getRowType(); + RexBuilder b = prel.getCluster().getRexBuilder(); List<RexNode> projections = Lists.newArrayList(); int projectCount = t.getFieldList().size(); + + // no point in reordering if we only have one column + if(projectCount < 2) return prel; + for(int i =0; i < projectCount; i++){ projections.add(b.makeInputRef(prel, i)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index df66dcf..eb2c891 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -100,7 +100,7 @@ public class DrillSqlWorker { } public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{ - return getPlan(null); + return getPlan(sql, null); } public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws SqlParseException, ValidationException, RelConversionException, IOException{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index a3307cf..a912778 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -142,15 +142,16 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection return null; } - public int bind(final int initialPort) throws InterruptedException, DrillbitStartupException { + public int bind(final int initialPort, boolean allowPortHunting) throws InterruptedException, DrillbitStartupException { int port = initialPort - 1; while (true) { try { b.bind(++port).sync(); break; } catch (Exception e) { - if (e instanceof BindException) + if (e instanceof BindException && allowPortHunting){ continue; + } throw new DrillbitStartupException("Could not bind Drillbit", e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index de8caf6..1cacc4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -39,19 +39,21 @@ public class ControllerImpl implements Controller { private final ControlMessageHandler handler; private final BootStrapContext context; private final ConnectionManagerRegistry connectionRegistry; + private final boolean allowPortHunting; - public ControllerImpl(BootStrapContext context, ControlMessageHandler handler) { + public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, boolean allowPortHunting) { super(); this.handler = handler; this.context = context; this.connectionRegistry = new ConnectionManagerRegistry(handler, context); + this.allowPortHunting = allowPortHunting; } @Override public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException { server = new ControlServer(handler, context, connectionRegistry); int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT); - port = server.bind(port); + port = server.bind(port, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build(); connectionRegistry.setEndpoint(completeEndpoint); return completeEndpoint; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java index f15494f..9c2ef5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java @@ -37,17 +37,19 @@ public class DataConnectionCreator implements Closeable { private final BootStrapContext context; private final WorkEventBus workBus; private final DataResponseHandler dataHandler; + private final boolean allowPortHunting; - public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) { + public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) { super(); this.context = context; this.workBus = workBus; this.dataHandler = dataHandler; + this.allowPortHunting = allowPortHunting; } public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException { server = new DataServer(context, workBus, dataHandler); - int port = server.bind(partialEndpoint.getControlPort() + 1); + int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build(); return completeEndpoint; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index eba6e92..fb499b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -81,11 +81,18 @@ public class Drillbit implements Closeable{ private volatile RegistrationHandle handle; public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception { - + boolean allowPortHunting = serviceSet != null; + boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE); this.context = new BootStrapContext(config); this.manager = new WorkManager(context); - this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler()); - this.embeddedJetty = new Server(8047); + this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler(), allowPortHunting); + + if(enableHttp){ + this.embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT)); + }else{ + this.embeddedJetty = null; + } + if(serviceSet != null){ this.coord = serviceSet.getCoordinator(); @@ -99,6 +106,8 @@ public class Drillbit implements Closeable{ } private void startJetty() throws Exception{ + if(embeddedJetty == null) return; + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); context.setContextPath("/"); embeddedJetty.setHandler(context); @@ -131,7 +140,7 @@ public class Drillbit implements Closeable{ logger.warn("Interrupted while sleeping during coordination deregistration."); } try { - embeddedJetty.stop(); + if(embeddedJetty != null) embeddedJetty.stop(); } catch (Exception e) { logger.warn("Failure while shutting down embedded jetty server."); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index cfbde73..bd745d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -43,23 +43,25 @@ import com.google.common.io.Closeables; public class ServiceEngine implements Closeable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class); - + private final UserServer userServer; private final Controller controller; private final DataConnectionCreator dataPool; private final DrillConfig config; boolean useIP = false; - - public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler){ + private final boolean allowPortHunting; + + public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting){ this.userServer = new UserServer(context.getAllocator(), new NioEventLoopGroup(context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), new NamedThreadFactory("UserServer-")), userWorker); - this.controller = new ControllerImpl(context, controlMessageHandler); - this.dataPool = new DataConnectionCreator(context, workBus, dataHandler); + this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting); + this.dataPool = new DataConnectionCreator(context, workBus, dataHandler, allowPortHunting); this.config = context.getConfig(); + this.allowPortHunting = allowPortHunting; } - + public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{ - int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT)); + int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting); String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName(); DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder() .setAddress(address) @@ -74,7 +76,7 @@ public class ServiceEngine implements Closeable{ public DataConnectionCreator getDataConnectionCreator(){ return dataPool; } - + public Controller getController() { return controller; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index 9a745ac..b0133f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; import parquet.org.codehaus.jackson.annotate.JsonCreator; @@ -112,6 +113,11 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{ @Override + public int getOperatorType() { + return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE; + } + + @Override public GroupScan clone(List<SchemaPath> columns) { return this; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 9a180fd..26205bd 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -65,6 +65,10 @@ drill.exec: { delay: 500 } }, + http: { + enabled: true, + port: 8047 + }, functions: ["org.apache.drill.expr.fn.impl"], network: { start: 35000 http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java index 74f5ba9..f579448 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java @@ -74,7 +74,7 @@ public class TestBitRpc extends ExecTest { DataResponseHandler drp = new BitComTestHandler(); DataServer server = new DataServer(c, workBus, drp); - port = server.bind(port); + port = server.bind(port, false); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); DataConnectionManager manager = new DataConnectionManager(FragmentHandle.getDefaultInstance(), ep, c2); DataTunnel tunnel = new DataTunnel(manager); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java index 0abdbd3..34bcb5e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java @@ -35,14 +35,14 @@ public class JsonRecordReader2Test extends BaseTestQuery{ } @Test - public void z() throws Exception{ + public void testComplexMultipleTimes() throws Exception{ for(int i =0 ; i < 5; i++){ test("select * from cp.`join/merge_join.json`"); } } @Test - public void y() throws Exception{ + public void trySimpleQueryWithLimit() throws Exception{ test("select * from cp.`limit/test1.json` limit 10"); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/exec/java-exec/src/test/resources/logback.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/logback.xml b/exec/java-exec/src/test/resources/logback.xml index cd3d971..b8e0ca2 100644 --- a/exec/java-exec/src/test/resources/logback.xml +++ b/exec/java-exec/src/test/resources/logback.xml @@ -42,7 +42,7 @@ <!-- </logger> --> <root> - <level value="info" /> + <level value="error" /> <appender-ref ref="STDOUT" /> </root> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/79054a85/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d4c077a..5d976e8 100644 --- a/pom.xml +++ b/pom.xml @@ -260,7 +260,7 @@ <artifactId>maven-surefire-plugin</artifactId> <version>2.17</version> <configuration> - <argLine>-Xms512m -Xmx1g -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> + <argLine>-Xms512m -Xmx1g -Ddrill.exec.http.enabled=false -XX:MaxPermSize=256M -XX:MaxDirectMemorySize=2096M -XX:+CMSClassUnloadingEnabled</argLine> <forkCount>4</forkCount> <reuseForks>true</reuseForks> <additionalClasspathElements>
