DRILL-836: Drill needs to return complex types (e.g., map and array) as a JSON string
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fc00bc4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fc00bc4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fc00bc4b Branch: refs/heads/master Commit: fc00bc4bc20f34b24df43a30c1340e5f1f96de54 Parents: 5da52cb Author: Aditya Kishore <[email protected]> Authored: Thu Jun 12 14:50:28 2014 -0700 Committer: Aditya Kishore <[email protected]> Committed: Fri Jun 13 03:49:30 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/config/Flatten.java | 71 ++++++++++++ .../impl/project/FlattenBatchCreator.java | 42 +++++++ .../impl/project/ProjectRecordBatch.java | 26 ++++- .../exec/planner/physical/FlattenPrel.java | 62 ++++++++++ .../physical/visitor/FlattenPrelVisitor.java | 40 +++++++ .../planner/sql/handlers/DefaultSqlHandler.java | 45 ++++++-- .../apache/drill/exec/rpc/user/UserClient.java | 6 +- .../apache/drill/exec/rpc/user/UserServer.java | 29 +++-- .../apache/drill/exec/rpc/user/UserSession.java | 11 +- .../java/org/apache/drill/PlanningBase.java | 4 +- .../exec/physical/impl/TestOptiqPlans.java | 2 +- .../drill/exec/proto/SchemaUserProtos.java | 7 ++ .../apache/drill/exec/proto/UserBitShared.java | 15 ++- .../org/apache/drill/exec/proto/UserProtos.java | 114 ++++++++++++++++--- .../exec/proto/beans/CoreOperatorType.java | 4 +- .../exec/proto/beans/UserToBitHandshake.java | 23 ++++ protocol/src/main/protobuf/User.proto | 49 ++++---- protocol/src/main/protobuf/UserBitShared.proto | 1 + 18 files changed, 471 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java new file mode 100644 index 0000000..2bcaab3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("flatten") +public class Flatten extends AbstractSingle { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Flatten.class); + + @JsonCreator + public Flatten(@JsonProperty("child") PhysicalOperator child) { + super(child); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{ + return physicalVisitor.visitOp(this, value); + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount()); + } + + @Override + public Size getSize() { + return child.getSize(); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new Flatten(child); + } + + @Override + public SelectionVectorMode getSVMode() { + return child.getSVMode(); + } + + @Override + public int getOperatorType() { + return CoreOperatorType.FLATTEN_VALUE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java new file mode 100644 index 0000000..9bea73c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.project; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Flatten; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.base.Preconditions; + +public class FlattenBatchCreator implements BatchCreator<Flatten> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, Flatten flatten, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + return new ProjectRecordBatch(new Project(null, flatten.getChild()), + children.iterator().next(), + context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 93cd19d..61c256b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -18,17 +18,20 @@ package org.apache.drill.exec.physical.impl.project; import java.io.IOException; -import java.util.HashSet; import java.util.List; +import org.apache.drill.common.expression.ConvertExpression; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCallFactory; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -48,7 +51,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import com.carrotsearch.hppc.IntOpenHashSet; @@ -170,7 +172,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ protected void setupNewSchema() throws SchemaChangeException{ this.allocationVectors = Lists.newArrayList(); container.clear(); - final List<NamedExpression> exprs = popConfig.getExprs(); + final List<NamedExpression> exprs = getExpressionList(); final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); @@ -250,5 +252,23 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } + private List<NamedExpression> getExpressionList() { + if (popConfig.getExprs() != null) { + return popConfig.getExprs(); + } + + List<NamedExpression> exprs = Lists.newArrayList(); + for (MaterializedField field : incoming.getSchema()) { + if (Types.isComplex(field.getType())) { + exprs.add(new NamedExpression( + FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN), + new FieldReference(field.getPath())) + ); + } else { + exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath()))); + } + } + return exprs; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java new file mode 100644 index 0000000..f541422 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.Flatten; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.eigenbase.rel.SingleRel; + +public class FlattenPrel extends SingleRel implements Prel { + + public FlattenPrel(Prel phyRelNode) { + super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Flatten p = new Flatten(((Prel) getChild()).getPhysicalOperator(creator)); + p.setOperatorId(creator.getOperatorId(this)); + return p; + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java new file mode 100644 index 0000000..5892782 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import java.util.Collections; + +import org.apache.drill.exec.planner.physical.FlattenPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ScreenPrel; +import org.eigenbase.rel.RelNode; + +public class FlattenPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> { + + private static final FlattenPrelVisitor INSTANCE = new FlattenPrelVisitor(); + + public static Prel addFlattenPrel(Prel prel) { + return prel.accept(INSTANCE, null); + } + + @Override + public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException { + return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)new FlattenPrel((Prel)prel.getChild()))); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 883b039..aca0ae6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.explain.PrelSequencer; +import org.apache.drill.exec.planner.physical.visitor.FlattenPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer; import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor; import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; @@ -140,22 +141,46 @@ public class DefaultSqlHandler extends AbstractSqlHandler { RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON); Prel phyRelNode = (Prel) planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel); - // Join might cause naming conflicts from its left and right child. - // In such case, we have to insert Project to rename the conflicting names. + /* The order of the following transformation is important */ + + /* + * 1.) + * Join might cause naming conflicts from its left and right child. + * In such case, we have to insert Project to rename the conflicting names. + */ phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode); - // Since our operators work via names rather than indices, we have to make to reorder any output - // before we return data to the user as we may have accindentally shuffled things. This adds - // a trivial project to reorder columns prior to output. + /* + * 2.) + * Since our operators work via names rather than indices, we have to make to reorder any + * output before we return data to the user as we may have accidentally shuffled things. + * This adds a trivial project to reorder columns prior to output. + */ phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode); - // Make sure that the no rels are repeats. This could happen in the case of querying the same table twice as Optiq may canonicalize these. + /* 3.) + * Next, we add any required selection vector removers given the supported encodings of each + * operator. This will ultimately move to a new trait but we're managing here for now to avoid + * introducing new issues in planning before the next release + */ + phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode); + + /* 4.) + * if the client does not support complex types (Map, Repeated) + * insert a project which which would convert + */ + if (!context.getSession().isSupportComplexTypes()) { + logger.debug("Client does not support complex types, add Flatten operator."); + phyRelNode = FlattenPrelVisitor.addFlattenPrel(phyRelNode); + } + + /* 5.) + * Finally, Make sure that the no rels are repeats. + * This could happen in the case of querying the same table twice as Optiq may canonicalize these. + */ phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode); - // the last thing we do is add any required selection vector removers given the supported encodings of each - // operator. This will ultimately move to a new trait but we're managing here for now to avoid introducing new - // issues in planning before the next release - return SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode); + return phyRelNode; } protected PhysicalOperator convertToPop(Prel prel) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 0e3cc6b..277bb0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -56,10 +56,12 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand throws RpcException, InterruptedException { UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) - .setSupportListening(true); + .setSupportListening(true) + .setSupportComplexTypes(true); - if (props != null) + if (props != null) { hsBuilder.setProperties(props); + } this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 2a4c2cc..e96ba6c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -27,12 +27,10 @@ import java.io.IOException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; -import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; @@ -61,7 +59,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { - // a user server only expects acknowledgements on messages it creates. + // a user server only expects acknowledgments on messages it creates. switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); @@ -76,7 +74,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec switch (rpcType) { case RpcType.RUN_QUERY_VALUE: - // logger.debug("Received query to run. Returning query handle."); + logger.trace("Received query to run. Returning query handle."); try { RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody)); return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query)); @@ -85,7 +83,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } case RpcType.REQUEST_RESULTS_VALUE: - // logger.debug("Received results requests. Returning empty query result."); + logger.trace("Received results requests. Returning empty query result."); try { RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody)); return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req)); @@ -112,8 +110,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec super(channel); } - void setUser(UserCredentials credentials, UserProperties props) throws IOException{ - session = new UserSession(worker.getSystemOptions(), credentials, props); + void setUser(UserToBitHandshake inbound) throws IOException { + session = new UserSession(worker.getSystemOptions(), inbound.getCredentials(), inbound.getProperties()); + session.setSupportComplexTypes(inbound.getSupportComplexTypes()); } public UserSession getSession(){ @@ -121,7 +120,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){ -// logger.debug("Sending result to client with {}", result); + logger.trace("Sending result to client with {}", result); send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers()); } @@ -130,7 +129,6 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec return alloc; } - } @Override @@ -140,16 +138,23 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec @Override protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(final UserClientConnection connection) { + return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){ @Override public MessageLite getHandshakeResponse(UserToBitHandshake inbound) throws Exception { -// logger.debug("Handling handshake from user to bit. {}", inbound); - if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION)); - connection.setUser(inbound.getCredentials(), inbound.getProperties()); + logger.trace("Handling handshake from user to bit. {}", inbound); + if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) { + throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION)); + } + + connection.setUser(inbound); + return BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(); } + }; + } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index 6ecffaf..18e365e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.rpc.user; import java.io.IOException; -import java.util.Iterator; import java.util.Map; import net.hydromatic.optiq.SchemaPlus; @@ -38,6 +37,7 @@ public class UserSession { private DrillUser user; private boolean enableExchanges = true; + private boolean supportComplexTypes = false; private UserCredentials credentials; private Map<String, String> properties; private OptionManager options; @@ -107,4 +107,13 @@ public class UserSession { return schema; } + public boolean isSupportComplexTypes() { + return supportComplexTypes; + } + + public UserSession setSupportComplexTypes(boolean supportComplexType) { + this.supportComplexTypes = supportComplexType; + return this; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index ad114ab..a819453 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -100,7 +100,7 @@ public class PlanningBase extends ExecTest{ registry.init(); final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config); final SchemaPlus root = Frameworks.createRootSchema(false); - registry.getSchemaFactory().registerSchemas(new UserSession(null, null, null), root); + registry.getSchemaFactory().registerSchemas(new UserSession(null, null, null).setSupportComplexTypes(true), root); @@ -113,7 +113,7 @@ public class PlanningBase extends ExecTest{ context.getFunctionRegistry(); result = functionRegistry; context.getSession(); - result = new UserSession(null, null, null); + result = new UserSession(null, null, null).setSupportComplexTypes(true); context.getCurrentEndpoint(); result = DrillbitEndpoint.getDefaultInstance(); context.getActiveEndpoints(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 199ecfc..f6200f0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -110,7 +110,7 @@ public class TestOptiqPlans extends ExecTest { }; RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create())); - QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext); + QueryContext qc = new QueryContext(new UserSession(null, null, null).setSupportComplexTypes(true), QueryId.getDefaultInstance(), bitContext); PhysicalPlanReader reader = bitContext.getPlanReader(); LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java index c984876..3b056cf 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java @@ -278,6 +278,8 @@ public final class SchemaUserProtos if(message.hasProperties()) output.writeObject(5, message.getProperties(), org.apache.drill.exec.proto.SchemaUserProtos.UserProperties.WRITE, false); + if(message.hasSupportComplexTypes()) + output.writeBool(6, message.getSupportComplexTypes(), false); } public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message) { @@ -334,6 +336,9 @@ public final class SchemaUserProtos builder.setProperties(input.mergeObject(org.apache.drill.exec.proto.UserProtos.UserProperties.newBuilder(), org.apache.drill.exec.proto.SchemaUserProtos.UserProperties.MERGE)); break; + case 6: + builder.setSupportComplexTypes(input.readBool()); + break; default: input.handleUnknownField(number, this); } @@ -379,6 +384,7 @@ public final class SchemaUserProtos case 3: return "rpcVersion"; case 4: return "credentials"; case 5: return "properties"; + case 6: return "supportComplexTypes"; default: return null; } } @@ -395,6 +401,7 @@ public final class SchemaUserProtos fieldMap.put("rpcVersion", 3); fieldMap.put("credentials", 4); fieldMap.put("properties", 5); + fieldMap.put("supportComplexTypes", 6); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 105281a..c100968 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -454,6 +454,10 @@ public final class UserBitShared { * <code>INFO_SCHEMA_SUB_SCAN = 30;</code> */ INFO_SCHEMA_SUB_SCAN(30, 30), + /** + * <code>FLATTEN = 31;</code> + */ + FLATTEN(31, 31), ; /** @@ -580,6 +584,10 @@ public final class UserBitShared { * <code>INFO_SCHEMA_SUB_SCAN = 30;</code> */ public static final int INFO_SCHEMA_SUB_SCAN_VALUE = 30; + /** + * <code>FLATTEN = 31;</code> + */ + public static final int FLATTEN_VALUE = 31; public final int getNumber() { return value; } @@ -617,6 +625,7 @@ public final class UserBitShared { case 28: return TEXT_SUB_SCAN; case 29: return JSON_SUB_SCAN; case 30: return INFO_SCHEMA_SUB_SCAN; + case 31: return FLATTEN; default: return null; } } @@ -16439,7 +16448,7 @@ public final class UserBitShared { "*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010P" + "HYSICAL\020\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022" + "\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n", - "\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\345" + + "\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\362" + "\004\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024" + "\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH" + "_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOI" + @@ -16455,8 +16464,8 @@ public final class UserBitShared { "K_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRE" + "CT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_S" + "UB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCH" + - "EMA_SUB_SCAN\020\036B.\n\033org.apache.drill.exec." + - "protoB\rUserBitSharedH\001" + "EMA_SUB_SCAN\020\036\022\013\n\007FLATTEN\020\037B.\n\033org.apach" + + "e.drill.exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java index d9f4c20..048bd20 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java @@ -1669,6 +1669,16 @@ public final class UserProtos { * <code>optional .exec.user.UserProperties properties = 5;</code> */ org.apache.drill.exec.proto.UserProtos.UserPropertiesOrBuilder getPropertiesOrBuilder(); + + // optional bool support_complex_types = 6 [default = false]; + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + boolean hasSupportComplexTypes(); + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + boolean getSupportComplexTypes(); } /** * Protobuf type {@code exec.user.UserToBitHandshake} @@ -1768,6 +1778,11 @@ public final class UserProtos { bitField0_ |= 0x00000010; break; } + case 48: { + bitField0_ |= 0x00000020; + supportComplexTypes_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -1900,12 +1915,29 @@ public final class UserProtos { return properties_; } + // optional bool support_complex_types = 6 [default = false]; + public static final int SUPPORT_COMPLEX_TYPES_FIELD_NUMBER = 6; + private boolean supportComplexTypes_; + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public boolean hasSupportComplexTypes() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public boolean getSupportComplexTypes() { + return supportComplexTypes_; + } + private void initFields() { channel_ = org.apache.drill.exec.proto.UserBitShared.RpcChannel.USER; supportListening_ = false; rpcVersion_ = 0; credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance(); properties_ = org.apache.drill.exec.proto.UserProtos.UserProperties.getDefaultInstance(); + supportComplexTypes_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1940,6 +1972,9 @@ public final class UserProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeMessage(5, properties_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeBool(6, supportComplexTypes_); + } getUnknownFields().writeTo(output); } @@ -1969,6 +2004,10 @@ public final class UserProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, properties_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(6, supportComplexTypes_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -2105,6 +2144,8 @@ public final class UserProtos { propertiesBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000010); + supportComplexTypes_ = false; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -2161,6 +2202,10 @@ public final class UserProtos { } else { result.properties_ = propertiesBuilder_.build(); } + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.supportComplexTypes_ = supportComplexTypes_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2192,6 +2237,9 @@ public final class UserProtos { if (other.hasProperties()) { mergeProperties(other.getProperties()); } + if (other.hasSupportComplexTypes()) { + setSupportComplexTypes(other.getSupportComplexTypes()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2561,6 +2609,39 @@ public final class UserProtos { return propertiesBuilder_; } + // optional bool support_complex_types = 6 [default = false]; + private boolean supportComplexTypes_ ; + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public boolean hasSupportComplexTypes() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public boolean getSupportComplexTypes() { + return supportComplexTypes_; + } + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public Builder setSupportComplexTypes(boolean value) { + bitField0_ |= 0x00000020; + supportComplexTypes_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool support_complex_types = 6 [default = false];</code> + */ + public Builder clearSupportComplexTypes() { + bitField0_ = (bitField0_ & ~0x00000020); + supportComplexTypes_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:exec.user.UserToBitHandshake) } @@ -4250,25 +4331,26 @@ public final class UserProtos { "\032\023UserBitShared.proto\"&\n\010Property\022\013\n\003key" + "\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"9\n\016UserProperties\022" + "\'\n\nproperties\030\001 \003(\0132\023.exec.user.Property" + - "\"\326\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" + + "\"\374\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" + "2\027.exec.shared.RpcChannel:\004USER\022\031\n\021suppo" + "rt_listening\030\002 \001(\010\022\023\n\013rpc_version\030\003 \001(\005\022" + "1\n\013credentials\030\004 \001(\0132\034.exec.shared.UserC" + "redentials\022-\n\nproperties\030\005 \001(\0132\031.exec.us" + - "er.UserProperties\"S\n\016RequestResults\022&\n\010q", - "uery_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021m" + - "aximum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014re" + - "sults_mode\030\001 \001(\0162\033.exec.user.QueryResult" + - "sMode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryT" + - "ype\022\014\n\004plan\030\003 \001(\t\")\n\022BitToUserHandshake\022" + - "\023\n\013rpc_version\030\002 \001(\005*\270\001\n\007RpcType\022\r\n\tHAND" + - "SHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUE" + - "RY\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULT" + - "S\020\005\022\020\n\014QUERY_RESULT\020\006\022\020\n\014QUERY_HANDLE\020\007\022" + - "\026\n\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION", - "_LIST\020\t*#\n\020QueryResultsMode\022\017\n\013STREAM_FU" + - "LL\020\001B+\n\033org.apache.drill.exec.protoB\nUse" + - "rProtosH\001" + "er.UserProperties\022$\n\025support_complex_typ", + "es\030\006 \001(\010:\005false\"S\n\016RequestResults\022&\n\010que" + + "ry_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021max" + + "imum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014resu" + + "lts_mode\030\001 \001(\0162\033.exec.user.QueryResultsM" + + "ode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryTyp" + + "e\022\014\n\004plan\030\003 \001(\t\")\n\022BitToUserHandshake\022\023\n" + + "\013rpc_version\030\002 \001(\005*\270\001\n\007RpcType\022\r\n\tHANDSH" + + "AKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY" + + "\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020" + + "\005\022\020\n\014QUERY_RESULT\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n", + "\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_L" + + "IST\020\t*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL" + + "\020\001B+\n\033org.apache.drill.exec.protoB\nUserP" + + "rotosH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4292,7 +4374,7 @@ public final class UserProtos { internal_static_exec_user_UserToBitHandshake_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_exec_user_UserToBitHandshake_descriptor, - new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", }); + new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", "SupportComplexTypes", }); internal_static_exec_user_RequestResults_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_exec_user_RequestResults_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 3690625..abd7b78 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -52,7 +52,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO TEXT_WRITER(27), TEXT_SUB_SCAN(28), JSON_SUB_SCAN(29), - INFO_SCHEMA_SUB_SCAN(30); + INFO_SCHEMA_SUB_SCAN(30), + FLATTEN(31); public final int number; @@ -101,6 +102,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 28: return TEXT_SUB_SCAN; case 29: return JSON_SUB_SCAN; case 30: return INFO_SCHEMA_SUB_SCAN; + case 31: return FLATTEN; default: return null; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java index 70235b1..67ac4e5 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java @@ -46,12 +46,14 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB static final UserToBitHandshake DEFAULT_INSTANCE = new UserToBitHandshake(); + static final Boolean DEFAULT_SUPPORT_COMPLEX_TYPES = new Boolean(false); private RpcChannel channel; private Boolean supportListening; private int rpcVersion; private UserCredentials credentials; private UserProperties properties; + private Boolean supportComplexTypes = DEFAULT_SUPPORT_COMPLEX_TYPES; public UserToBitHandshake() { @@ -125,6 +127,19 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB return this; } + // supportComplexTypes + + public Boolean getSupportComplexTypes() + { + return supportComplexTypes; + } + + public UserToBitHandshake setSupportComplexTypes(Boolean supportComplexTypes) + { + this.supportComplexTypes = supportComplexTypes; + return this; + } + // java serialization public void readExternal(ObjectInput in) throws IOException @@ -196,6 +211,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB message.properties = input.mergeObject(message.properties, UserProperties.getSchema()); break; + case 6: + message.supportComplexTypes = input.readBool(); + break; default: input.handleUnknownField(number, this); } @@ -221,6 +239,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB if(message.properties != null) output.writeObject(5, message.properties, UserProperties.getSchema(), false); + + if(message.supportComplexTypes != null && message.supportComplexTypes != DEFAULT_SUPPORT_COMPLEX_TYPES) + output.writeBool(6, message.supportComplexTypes, false); } public String getFieldName(int number) @@ -232,6 +253,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB case 3: return "rpcVersion"; case 4: return "credentials"; case 5: return "properties"; + case 6: return "supportComplexTypes"; default: return null; } } @@ -250,6 +272,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB __fieldMap.put("rpcVersion", 3); __fieldMap.put("credentials", 4); __fieldMap.put("properties", 5); + __fieldMap.put("supportComplexTypes", 6); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/protobuf/User.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto index ea12323..6c41a37 100644 --- a/protocol/src/main/protobuf/User.proto +++ b/protocol/src/main/protobuf/User.proto @@ -7,28 +7,23 @@ option optimize_for = SPEED; import "SchemaDef.proto"; import "UserBitShared.proto"; - - ////// UserToBit RPC /////// enum RpcType { - HANDSHAKE = 0; - ACK = 1; - GOODBYE = 2; - - // user to bit - RUN_QUERY = 3; - CANCEL_QUERY = 4; - REQUEST_RESULTS = 5; - - - // bit to user - QUERY_RESULT = 6; - QUERY_HANDLE = 7; - - REQ_META_FUNCTIONS = 8; - RESP_FUNCTION_LIST = 9; - - + HANDSHAKE = 0; + ACK = 1; + GOODBYE = 2; + + // user to bit + RUN_QUERY = 3; + CANCEL_QUERY = 4; + REQUEST_RESULTS = 5; + + // bit to user + QUERY_RESULT = 6; + QUERY_HANDLE = 7; + + REQ_META_FUNCTIONS = 8; + RESP_FUNCTION_LIST = 9; } message Property { @@ -46,6 +41,7 @@ message UserToBitHandshake { optional int32 rpc_version = 3; optional exec.shared.UserCredentials credentials = 4; optional UserProperties properties = 5; + optional bool support_complex_types = 6 [default = false]; } message RequestResults { @@ -60,16 +56,11 @@ message RunQuery { } enum QueryResultsMode { - STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available. - // STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query. Once the query is completed, server will inform the client of the first query chunk. - // QUERY_FOR_STATUS = 3; // Client will need to query for status of query. + STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available. + // STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query. Once the query is completed, server will inform the client of the first query chunk. + // QUERY_FOR_STATUS = 3; // Client will need to query for status of query. } - message BitToUserHandshake { - optional int32 rpc_version = 2; + optional int32 rpc_version = 2; } - - - - http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 4bafeb8..b754ee5 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -197,4 +197,5 @@ enum CoreOperatorType { TEXT_SUB_SCAN = 28; JSON_SUB_SCAN = 29; INFO_SCHEMA_SUB_SCAN = 30; + FLATTEN = 31; }
