http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index 393773d..43089d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -74,13 +74,17 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ @Override protected ServerHandshakeHandler<BitControlHandshake> getHandshakeHandler(final ControlConnection connection) { - return new ServerHandshakeHandler<BitControlHandshake>(RpcType.HANDSHAKE, BitControlHandshake.PARSER){ + return new ServerHandshakeHandler<BitControlHandshake>(RpcType.HANDSHAKE, BitControlHandshake.PARSER) { @Override public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception { // logger.debug("Handling handshake from other bit. {}", inbound); - if(inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION)); - if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint())); + if (inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) { + throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION)); + } + if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) { + throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint())); + } connection.setEndpoint(inbound.getEndpoint()); // add the @@ -129,5 +133,4 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ } - }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 60d2cdf..5c126e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -64,9 +64,10 @@ public class WorkEventBus { public void setFragmentStatusListener(QueryId queryId, FragmentStatusListener listener) throws RpcException { logger.debug("Adding fragment status listener for queryId {}.", queryId); FragmentStatusListener old = listeners.putIfAbsent(queryId, listener); - if (old != null) + if (old != null) { throw new RpcException( "Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another."); + } } public void status(FragmentStatus status) { @@ -83,12 +84,13 @@ public class WorkEventBus { public void setRootFragmentManager(RootFragmentManager fragmentManager) { FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); - if (old != null) + if (old != null) { throw new IllegalStateException( "Tried to set fragment manager when has already been set for the provided fragment handle."); + } } - public FragmentManager getFragmentManager(FragmentHandle handle){ + public FragmentManager getFragmentManager(FragmentHandle handle) { return managers.get(handle); } @@ -103,9 +105,11 @@ public class WorkEventBus { return null; } FragmentManager manager = managers.get(handle); - if (manager != null) return manager; + if (manager != null) { + return manager; + } DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE); - for(Map.Entry<FragmentHandle, PlanFragment> e : planCache.getLocalEntries()){ + for (Map.Entry<FragmentHandle, PlanFragment> e : planCache.getLocalEntries()) { // logger.debug("Key: {}", e.getKey()); // logger.debug("Value: {}", e.getValue()); } @@ -130,7 +134,8 @@ public class WorkEventBus { return manager; } - public void removeFragmentManager(FragmentHandle handle){ + public void removeFragmentManager(FragmentHandle handle) { managers.remove(handle); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index 67856f3..a9eb66f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -74,20 +74,22 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl throw new UnsupportedOperationException("DataClient is unidirectional by design."); } - BufferAllocator getAllocator(){ + BufferAllocator getAllocator() { return allocator; } @Override protected void validateHandshake(BitServerHandshake handshake) throws RpcException { - if(handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); + if (handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) { + throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); + } } @Override protected void finalizeConnection(BitServerHandshake handshake, DataClientConnection connection) { } - public DataClientConnection getConnection(){ + public DataClientConnection getConnection() { return this.connection; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java index ecd10eb..3a569db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java @@ -36,7 +36,7 @@ public class DataClientConnection extends RemoteConnection{ private final DataClient client; private final UUID id; - public DataClientConnection(Channel channel, DataClient client){ + public DataClientConnection(Channel channel, DataClient client) { super(channel); this.client = client; // we use a local listener pool unless a global one is provided. @@ -49,7 +49,7 @@ public class DataClientConnection extends RemoteConnection{ } public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, - SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){ + SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { client.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies); } @@ -64,17 +64,28 @@ public class DataClientConnection extends RemoteConnection{ @Override public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } DataClientConnection other = (DataClientConnection) obj; if (id == null) { - if (other.id != null) return false; - } else if (!id.equals(other.id)) return false; + if (other.id != null) { + return false; + } + } else if (!id.equals(other.id)) { + return false; + } return true; } - public void shutdownIfClient(){ + public void shutdownIfClient() { Closeables.closeQuietly(client); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 8e503ec..2c6e02c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -83,12 +83,14 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { @Override public MessageLite getHandshakeResponse(BitClientHandshake inbound) throws Exception { // logger.debug("Handling handshake from other bit. {}", inbound); - if (inbound.getRpcVersion() != DataRpcConfig.RPC_VERSION) + if (inbound.getRpcVersion() != DataRpcConfig.RPC_VERSION) { throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), DataRpcConfig.RPC_VERSION)); - if (inbound.getChannel() != RpcChannel.BIT_DATA) + } + if (inbound.getChannel() != RpcChannel.BIT_DATA) { throw new RpcException(String.format("Invalid NodeMode. Expected BIT_DATA but received %s.", inbound.getChannel())); + } return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build(); } @@ -113,8 +115,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } } BufferAllocator allocator = manager.getFragmentContext().getAllocator(); - if(body != null){ - if(!allocator.takeOwnership((DrillBuf) body.unwrap())){ + if (body != null) { + if (!allocator.takeOwnership((DrillBuf) body.unwrap())) { dataHandler.handle(connection, manager, OOM_FRAGMENT, null, null); } } @@ -142,8 +144,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } - - @Override public OutOfMemoryHandler getOutOfMemoryHandler() { return new OutOfMemoryHandler() { @@ -158,4 +158,5 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { return new DataProtobufLengthDecoder(allocator, outOfMemoryHandler); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java index e36a1c6..ab4c9ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java @@ -31,10 +31,11 @@ public class QueryResultBatch { // logger.debug("New Result Batch with header {} and data {}", header, data); this.header = header; this.data = data; - if(this.data != null) data.retain(); + if (this.data != null) { + data.retain(); + } } - public QueryResult getHeader() { return header; } @@ -43,13 +44,14 @@ public class QueryResultBatch { return data; } - - public boolean hasData(){ + public boolean hasData() { return data != null; } - public void release(){ - if(data != null) data.release(); + public void release() { + if (data != null) { + data.release(); + } } @Override @@ -57,6 +59,4 @@ public class QueryResultBatch { return "QueryResultBatch [header=" + header + ", data=" + data + "]"; } - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index b12a4cf..9015a16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -49,7 +49,7 @@ public class QueryResultHandler { private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap(); - public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){ + public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener) { return new SubmissionListener(listener); } @@ -66,7 +66,9 @@ public class QueryResultHandler { BufferingListener bl = new BufferingListener(); l = resultsListener.putIfAbsent(result.getQueryId(), bl); // if we had a succesful insert, use that reference. Otherwise, just throw away the new bufering listener. - if (l == null) l = bl; + if (l == null) { + l = bl; + } if (result.getQueryId().toString().equals("")) { failAll(); } @@ -125,7 +127,7 @@ public class QueryResultHandler { l.resultArrived(r, throttle); last = r.getHeader().getIsLastChunk(); } - if(ex != null){ + if (ex != null) { l.submissionFailed(ex); return true; } @@ -136,7 +138,9 @@ public class QueryResultHandler { @Override public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { this.throttle = throttle; - if(result.getHeader().getIsLastChunk()) finished = true; + if (result.getHeader().getIsLastChunk()) { + finished = true; + } synchronized (this) { if (output == null) { @@ -151,7 +155,7 @@ public class QueryResultHandler { public void submissionFailed(RpcException ex) { finished = true; synchronized (this) { - if (output == null){ + if (output == null) { this.ex = ex; } else{ output.submissionFailed(ex); @@ -159,7 +163,7 @@ public class QueryResultHandler { } } - public boolean isFinished(){ + public boolean isFinished() { return finished; } @@ -201,7 +205,9 @@ public class QueryResultHandler { resultsListener.remove(oldListener); } else { boolean replaced = resultsListener.replace(queryId, oldListener, listener); - if (!replaced) throw new IllegalStateException(); + if (!replaced) { + throw new IllegalStateException(); + } } } else { throw new IllegalStateException("Trying to replace a non-buffering User Results listener."); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index f352a15..4df6bfe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -99,9 +99,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand @Override protected void validateHandshake(BitToUserHandshake inbound) throws RpcException { // logger.debug("Handling handshake from bit to user. {}", inbound); - if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) + if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) { throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION)); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index 2710837..d196743 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -95,7 +95,7 @@ public class UserSession { return sessionOptions; } - public DrillUser getUser(){ + public DrillUser getUser() { return user; } @@ -105,9 +105,11 @@ public class UserSession { * @param schema The root schema to find this path within. * @return true if the path was set successfully. false if this path was unavailable. */ - public boolean setDefaultSchemaPath(String fullPath, SchemaPlus schema){ + public boolean setDefaultSchemaPath(String fullPath, SchemaPlus schema) { SchemaPlus newDefault = findSchema(schema, fullPath); - if(newDefault == null) return false; + if (newDefault == null) { + return false; + } setProp(SCHEMA, fullPath); return true; } @@ -117,11 +119,11 @@ public class UserSession { * @param rootSchema * @return A {@link net.hydromatic.optiq.SchemaPlus} object. */ - public SchemaPlus getDefaultSchema(SchemaPlus rootSchema){ + public SchemaPlus getDefaultSchema(SchemaPlus rootSchema) { return findSchema(rootSchema, getProp(SCHEMA)); } - public boolean setSessionOption(String name, String value){ + public boolean setSessionOption(String name, String value) { return true; } @@ -136,9 +138,11 @@ public class UserSession { private SchemaPlus findSchema(SchemaPlus rootSchema, String schemaPath) { String[] paths = schemaPath.split("\\."); SchemaPlus schema = rootSchema; - for(String p : paths){ + for (String p : paths) { schema = schema.getSubSchema(p); - if(schema == null) break; + if (schema == null) { + break; + } } return schema; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index a9e11a4..2125166 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -118,7 +118,9 @@ public class Drillbit implements Closeable{ } private void startJetty() throws Exception{ - if(embeddedJetty == null) return; + if (embeddedJetty == null) { + return; + } ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); @@ -155,7 +157,9 @@ public class Drillbit implements Closeable{ } public void close() { - if (coord != null && handle != null) coord.unregister(handle); + if (coord != null && handle != null) { + coord.unregister(handle); + } try { Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2); @@ -163,7 +167,9 @@ public class Drillbit implements Closeable{ logger.warn("Interrupted while sleeping during coordination deregistration."); } try { - if(embeddedJetty != null) embeddedJetty.stop(); + if (embeddedJetty != null) { + embeddedJetty.stop(); + } } catch (Exception e) { logger.warn("Failure while shutting down embedded jetty server."); } @@ -192,11 +198,12 @@ public class Drillbit implements Closeable{ } } - public ClusterCoordinator getCoordinator(){ + public ClusterCoordinator getCoordinator() { return coord; } - public DrillbitContext getContext(){ + public DrillbitContext getContext() { return this.manager.getContext(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java index 906e03d..96e9d8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.local.LocalClusterCoordinator; import org.apache.drill.exec.memory.BufferAllocator; -public class RemoteServiceSet implements Closeable{ +public class RemoteServiceSet implements Closeable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); private final DistributedCache cache; @@ -40,7 +40,6 @@ public class RemoteServiceSet implements Closeable{ this.coordinator = coordinator; } - public DistributedCache getCache() { return cache; } @@ -49,19 +48,20 @@ public class RemoteServiceSet implements Closeable{ return coordinator; } - @Override public void close() throws IOException { - try{ - cache.close(); - }catch(Exception e){ - if(e instanceof IOException) throw (IOException) e; + try { + cache.close(); + } catch(Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } throw new IOException("Failure while closing cache", e); } coordinator.close(); } - public static RemoteServiceSet getLocalServiceSet(){ + public static RemoteServiceSet getLocalServiceSet() { return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator()); } @@ -69,4 +69,5 @@ public class RemoteServiceSet implements Closeable{ ICache c = new ICache(config, allocator, true); return new RemoteServiceSet(c, new LocalClusterCoordinator()); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java index 7401246..13894ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java @@ -57,7 +57,7 @@ public class OptionValue{ return new OptionValue(Kind.DOUBLE, type, name, null, null, null, val); } - public OptionValue(){} + public OptionValue() {} public static OptionValue createOption(Kind kind, OptionType type, String name, String val) { switch (kind) { @@ -118,44 +118,58 @@ public class OptionValue{ @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } OptionValue other = (OptionValue) obj; if (bool_val == null) { - if (other.bool_val != null) + if (other.bool_val != null) { return false; - } else if (!bool_val.equals(other.bool_val)) + } + } else if (!bool_val.equals(other.bool_val)) { return false; + } if (float_val == null) { - if (other.float_val != null) + if (other.float_val != null) { return false; - } else if (!float_val.equals(other.float_val)) + } + } else if (!float_val.equals(other.float_val)) { return false; - if (kind != other.kind) + } + if (kind != other.kind) { return false; + } if (name == null) { - if (other.name != null) + if (other.name != null) { return false; - } else if (!name.equals(other.name)) + } + } else if (!name.equals(other.name)) { return false; + } if (num_val == null) { - if (other.num_val != null) + if (other.num_val != null) { return false; - } else if (!num_val.equals(other.num_val)) + } + } else if (!num_val.equals(other.num_val)) { return false; + } if (string_val == null) { - if (other.string_val != null) + if (other.string_val != null) { return false; - } else if (!string_val.equals(other.string_val)) + } + } else if (!string_val.equals(other.string_val)) { return false; - if (type != other.type) + } + if (type != other.type) { return false; + } return true; } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 40e2aaf..4fa61e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -36,7 +36,7 @@ import org.eigenbase.sql.SqlLiteral; import com.google.common.collect.Maps; -public class SystemOptionManager implements OptionManager{ +public class SystemOptionManager implements OptionManager { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class); @@ -93,7 +93,7 @@ public class SystemOptionManager implements OptionManager{ private final ConcurrentMap<String, OptionValidator> knownOptions = Maps.newConcurrentMap(); private final PStoreProvider provider; - public SystemOptionManager(DrillConfig config, PStoreProvider provider){ + public SystemOptionManager(DrillConfig config, PStoreProvider provider) { this.provider = provider; this.config = PStoreConfig // .newJacksonBuilder(config.getMapper(), OptionValue.class) // @@ -110,7 +110,7 @@ public class SystemOptionManager implements OptionManager{ private class Iter implements Iterator<OptionValue>{ private Iterator<Map.Entry<String, OptionValue>> inner; - public Iter(Iterator<Map.Entry<String, OptionValue>> inner){ + public Iter(Iterator<Map.Entry<String, OptionValue>> inner) { this.inner = inner; } @@ -172,8 +172,8 @@ public class SystemOptionManager implements OptionManager{ private class SystemOptionAdmin implements OptionAdmin{ - public SystemOptionAdmin(){ - for(OptionValidator v : VALIDATORS){ + public SystemOptionAdmin() { + for(OptionValidator v : VALIDATORS) { knownOptions.put(v.getOptionName(), v); options.putIfAbsent(v.getOptionName(), v.getDefault()); } @@ -182,7 +182,7 @@ public class SystemOptionManager implements OptionManager{ @Override public void registerOptionType(OptionValidator validator) { - if(null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ){ + if (null != knownOptions.putIfAbsent(validator.getOptionName(), validator) ) { throw new IllegalArgumentException("Only one option is allowed to be registered with name: " + validator.getOptionName()); } } @@ -190,20 +190,21 @@ public class SystemOptionManager implements OptionManager{ @Override public void validate(OptionValue v) throws SetOptionException { OptionValidator validator = knownOptions.get(v.name); - if(validator == null) throw new SetOptionException("Unknown option " + v.name); + if (validator == null) { + throw new SetOptionException("Unknown option " + v.name); + } validator.validate(v); } @Override public OptionValue validate(String name, SqlLiteral value) throws SetOptionException { OptionValidator validator = knownOptions.get(name); - if(validator == null) throw new SetOptionException("Unknown option: " + name); + if (validator == null) { + throw new SetOptionException("Unknown option: " + name); + } return validator.validate(value); } - - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java index 0398215..a0afd29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java @@ -40,9 +40,10 @@ public class TypeValidators { @Override public void validate(OptionValue v) throws ExpressionParsingException { super.validate(v); - if (v.num_val > max || v.num_val < 0) + if (v.num_val > max || v.num_val < 0) { throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), 0, max)); + } } } @@ -55,8 +56,9 @@ public class TypeValidators { @Override public void validate(OptionValue v) throws ExpressionParsingException { super.validate(v); - if (!isPowerOfTwo(v.num_val)) + if (!isPowerOfTwo(v.num_val)) { throw new ExpressionParsingException(String.format("Option %s must be a power of two.", getOptionName())); + } } private boolean isPowerOfTwo(long num) { @@ -77,36 +79,37 @@ public class TypeValidators { @Override public void validate(OptionValue v) throws ExpressionParsingException { super.validate(v); - if (v.float_val > max || v.float_val < min) + if (v.float_val > max || v.float_val < min) { throw new ExpressionParsingException(String.format("Option %s must be between %d and %d.", getOptionName(), min, max)); + } } } public static class BooleanValidator extends TypeValidator{ - public BooleanValidator(String name, boolean def){ + public BooleanValidator(String name, boolean def) { super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def)); } } + public static class StringValidator extends TypeValidator{ - public StringValidator(String name, String def){ + public StringValidator(String name, String def) { super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def)); } } + public static class LongValidator extends TypeValidator{ - public LongValidator(String name, long def){ + public LongValidator(String name, long def) { super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def)); } } - public static class DoubleValidator extends TypeValidator{ - public DoubleValidator(String name, double def){ + public static class DoubleValidator extends TypeValidator{ + public DoubleValidator(String name, double def) { super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def)); } - - } public static abstract class TypeValidator extends OptionValidator { @@ -133,9 +136,10 @@ public class TypeValidators { @Override public void validate(OptionValue v) throws ExpressionParsingException { - if (v.kind != kind) + if (v.kind != kind) { throw new ExpressionParsingException(String.format("Option %s must be of type %s but you tried to set to %s.", getOptionName(), kind.name(), v.kind.name())); + } } public void extraValidate(OptionValue v) throws ExpressionParsingException { @@ -174,4 +178,5 @@ public class TypeValidators { throw new ExpressionParsingException(String.format( "Drill doesn't support set option expressions with literals of type %s.", literal.getTypeName())); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java index 5d7ea2c..98e460a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ResourceInputStream.java @@ -27,14 +27,10 @@ import org.apache.hadoop.fs.Seekable; public class ResourceInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceInputStream.class); - public ResourceInputStream(byte[] bytes) { super(bytes); } - - - @Override public void readFully(long position, byte[] buffer) throws IOException { int l = read(position, buffer, 0, buffer.length); @@ -64,12 +60,14 @@ public class ResourceInputStream extends ByteArrayInputStream implements Seekabl } System.arraycopy(buf, start, b, off, len); return len; -} + } @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { int l = read(position, buffer, offset, length); - if (l < length) throw new EOFException(); + if (l < length) { + throw new EOFException(); + } } @Override @@ -77,7 +75,6 @@ public class ResourceInputStream extends ByteArrayInputStream implements Seekabl return pos; } - @Override public int read(byte[] b) throws IOException { int l = read(pos, b, 0, b.length); @@ -91,14 +88,11 @@ public class ResourceInputStream extends ByteArrayInputStream implements Seekabl return true; } - - - @Override public void seek(long arg0) throws IOException { - if(buf.length > arg0){ + if (buf.length > arg0) { pos = (int) arg0; - }else{ + } else { throw new EOFException(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index a876ea5..e0c14a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -206,19 +206,25 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage throw new ExecutionSetupException("Two processes tried to change a plugin at the same time."); } - if(persist) pluginSystemTable.put(name, config); + if (persist) { + pluginSystemTable.put(name, config); + } return newPlugin; } public StoragePlugin getPlugin(String name) throws ExecutionSetupException { StoragePlugin plugin = plugins.get(name); - if(name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) return plugin; + if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) { + return plugin; + } // since we lazily manage the list of plugins per server, we need to update this once we know that it is time. StoragePluginConfig config = this.pluginSystemTable.get(name); if (config == null) { - if(plugin != null) plugins.remove(name); + if (plugin != null) { + plugins.remove(name); + } return null; } else { if (plugin == null || !plugin.getConfig().equals(config)) { @@ -239,7 +245,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException { StoragePlugin p = getPlugin(storageConfig); - if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName())); + if (!(p instanceof FileSystemPlugin)) { + throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName())); + } FileSystemPlugin storage = (FileSystemPlugin) p; return storage.getFormatPlugin(formatConfig); } @@ -256,8 +264,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage return plugin; } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e; - if (t instanceof ExecutionSetupException) + if (t instanceof ExecutionSetupException) { throw ((ExecutionSetupException) t); + } throw new ExecutionSetupException(String.format( "Failure setting up new storage plugin configuration for config %s", pluginConfig), t); } @@ -304,7 +313,9 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage } // remove those which are no longer in the registry for (String pluginName : currentPluginNames) { - if(pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) continue; + if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) { + continue; + } plugins.remove(pluginName); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java index dda2dfc..2ba2910 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java @@ -54,7 +54,7 @@ public class BasicFormatMatcher extends FormatMatcher{ this.codecFactory = null; } - public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible){ + public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) { List<Pattern> patterns = Lists.newArrayList(); for (String extension : extensions) { patterns.add(Pattern.compile(".*\\." + extension)); @@ -74,7 +74,7 @@ public class BasicFormatMatcher extends FormatMatcher{ @Override public FormatSelection isReadable(FileSelection selection) throws IOException { - if(isReadable(selection.getFirstPath(fs))){ + if (isReadable(selection.getFirstPath(fs))) { if (plugin.getName() != null) { NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig(); namedConfig.name = plugin.getName(); @@ -98,13 +98,15 @@ public class BasicFormatMatcher extends FormatMatcher{ } else { fileName = status.getPath().toString(); } - for(Pattern p : patterns){ - if(p.matcher(fileName).matches()){ + for (Pattern p : patterns) { + if (p.matcher(fileName).matches()) { return true; } } - if(matcher.matches(status)) return true; + if (matcher.matches(status)) { + return true; + } return false; } @@ -116,32 +118,37 @@ public class BasicFormatMatcher extends FormatMatcher{ } - private class MagicStringMatcher{ + private class MagicStringMatcher { private List<RangeMagics> ranges; - public MagicStringMatcher(List<MagicString> magicStrings){ + public MagicStringMatcher(List<MagicString> magicStrings) { ranges = Lists.newArrayList(); - for(MagicString ms : magicStrings){ + for(MagicString ms : magicStrings) { ranges.add(new RangeMagics(ms)); } } public boolean matches(FileStatus status) throws IOException{ - if(ranges.isEmpty()) return false; + if (ranges.isEmpty()) { + return false; + } final Range<Long> fileRange = Range.closedOpen( 0L, status.getLen()); - try(FSDataInputStream is = fs.open(status.getPath()).getInputStream()){ - for(RangeMagics rMagic : ranges){ + try (FSDataInputStream is = fs.open(status.getPath()).getInputStream()) { + for(RangeMagics rMagic : ranges) { Range<Long> r = rMagic.range; - if(!fileRange.encloses(r)) continue; + if (!fileRange.encloses(r)) { + continue; + } int len = (int) (r.upperEndpoint() - r.lowerEndpoint()); byte[] bytes = new byte[len]; is.readFully(r.lowerEndpoint(), bytes); - for(byte[] magic : rMagic.magics){ - if(Arrays.equals(magic, bytes)) return true; + for (byte[] magic : rMagic.magics) { + if (Arrays.equals(magic, bytes)) { + return true; + } } - } } return false; @@ -151,10 +158,11 @@ public class BasicFormatMatcher extends FormatMatcher{ Range<Long> range; byte[][] magics; - public RangeMagics(MagicString ms){ + public RangeMagics(MagicString ms) { this.range = Range.closedOpen( ms.getOffset(), (long) ms.getBytes().length); this.magics = new byte[][]{ms.getBytes()}; } } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 76f6be4..36e7efe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -53,7 +53,7 @@ public class FileSelection { this.selectionRoot = selectionRoot; } - public FileSelection(List<String> files, boolean dummy){ + public FileSelection(List<String> files, boolean dummy) { this.files = files; } @@ -73,7 +73,9 @@ public class FileSelection { public boolean containsDirectories(DrillFileSystem fs) throws IOException { init(fs); for (FileStatus p : statuses) { - if (p.isDir()) return true; + if (p.isDir()) { + return true; + } } return false; } @@ -99,11 +101,15 @@ public class FileSelection { return statuses.get(0); } - public List<String> getAsFiles(){ - if(!files.isEmpty()) return files; - if(statuses == null) return Collections.emptyList(); + public List<String> getAsFiles() { + if (!files.isEmpty()) { + return files; + } + if (statuses == null) { + return Collections.emptyList(); + } List<String> files = Lists.newArrayList(); - for(FileStatus s : statuses){ + for (FileStatus s : statuses) { files.add(s.getPath().toString()); } return files; @@ -131,7 +137,9 @@ public class FileSelection { } else { Path p = new Path(parent,removeLeadingSlash(path)); FileStatus[] status = fs.getUnderlying().globStatus(p); - if(status == null || status.length == 0) return null; + if (status == null || status.length == 0) { + return null; + } String[] s = p.toUri().getPath().split("/"); String newPath = StringUtils.join(ArrayUtils.subarray(s, 0, s.length - 1), "/"); Preconditions.checkState(!newPath.contains("*") && !newPath.contains("?"), String.format("Unsupported selection path: %s", p)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index ec9a04e..b0855c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -60,7 +60,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ private final DrillFileSystem fs; public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{ - try{ + try { this.config = config; this.context = context; @@ -72,18 +72,18 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config); List<FormatMatcher> matchers = Lists.newArrayList(); formatPluginsByConfig = Maps.newHashMap(); - for(FormatPlugin p : formatsByName.values()){ + for (FormatPlugin p : formatsByName.values()) { matchers.add(p.getMatcher()); formatPluginsByConfig.put(p.getConfig(), p); } List<WorkspaceSchemaFactory> factories; - if(config.workspaces == null || config.workspaces.isEmpty()){ + if (config.workspaces == null || config.workspaces.isEmpty()) { factories = Collections.singletonList( new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); - }else{ + } else { factories = Lists.newArrayList(); - for(Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()){ + for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) { factories.add(new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, space.getKey(), name, fs, space.getValue(), matchers)); } @@ -93,7 +93,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ } } this.schemaFactory = new FileSystemSchemaFactory(name, factories); - }catch(IOException e){ + } catch (IOException e) { throw new ExecutionSetupException("Failure setting up file system plugin.", e); } } @@ -112,12 +112,14 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class); FormatPlugin plugin; - if(formatSelection.getFormat() instanceof NamedFormatPluginConfig){ + if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) { plugin = formatsByName.get( ((NamedFormatPluginConfig) formatSelection.getFormat()).name); - }else{ + } else { plugin = formatPluginsByConfig.get(formatSelection.getFormat()); } - if(plugin == null) throw new IOException(String.format("Failure getting requested format plugin named '%s'. It was not one of the format plugins registered.", formatSelection.getFormat())); + if (plugin == null) { + throw new IOException(String.format("Failure getting requested format plugin named '%s'. It was not one of the format plugins registered.", formatSelection.getFormat())); + } return plugin.getGroupScan(formatSelection.getSelection(), columns); } @@ -126,15 +128,16 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ schemaFactory.registerSchemas(session, parent); } - public FormatPlugin getFormatPlugin(String name){ + public FormatPlugin getFormatPlugin(String name) { return formatsByName.get(name); } - public FormatPlugin getFormatPlugin(FormatPluginConfig config){ - if(config instanceof NamedFormatPluginConfig){ + public FormatPlugin getFormatPlugin(FormatPluginConfig config) { + if (config instanceof NamedFormatPluginConfig) { return formatsByName.get(((NamedFormatPluginConfig) config).name); - }else{ + } else { return formatPluginsByConfig.get(config); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java index 0d0d46a..e5c0487 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java @@ -36,66 +36,64 @@ import com.google.common.collect.Maps; public class FormatCreator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class); - static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class); static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class); - static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig){ + static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) { final DrillConfig config = context.getConfig(); Map<String, FormatPlugin> plugins = Maps.newHashMap(); Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES)); - if(storageConfig.formats == null || storageConfig.formats.isEmpty()){ - - for(Class<? extends FormatPlugin> pluginClass: pluginClasses){ - for(Constructor<?> c : pluginClass.getConstructors()){ - try{ + if (storageConfig.formats == null || storageConfig.formats.isEmpty()) { - if(!DEFAULT_BASED.check(c)) continue; + for (Class<? extends FormatPlugin> pluginClass: pluginClasses) { + for (Constructor<?> c : pluginClass.getConstructors()) { + try { + if (!DEFAULT_BASED.check(c)) { + continue; + } FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig); plugins.put(plugin.getName(), plugin); - }catch(Exception e){ + } catch (Exception e) { logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e); } } } - }else{ - + } else { Map<Class<?>, Constructor<?>> constructors = Maps.newHashMap(); - for(Class<? extends FormatPlugin> pluginClass: pluginClasses){ - for(Constructor<?> c : pluginClass.getConstructors()){ - try{ - if(!FORMAT_BASED.check(c)) continue; + for (Class<? extends FormatPlugin> pluginClass: pluginClasses) { + for (Constructor<?> c : pluginClass.getConstructors()) { + try { + if (!FORMAT_BASED.check(c)) { + continue; + } Class<? extends FormatPluginConfig> configClass = (Class<? extends FormatPluginConfig>) c.getParameterTypes()[4]; constructors.put(configClass, c); - }catch(Exception e){ + } catch (Exception e) { logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e); } } } - for(Map.Entry<String, FormatPluginConfig> e : storageConfig.formats.entrySet()){ + for (Map.Entry<String, FormatPluginConfig> e : storageConfig.formats.entrySet()) { Constructor<?> c = constructors.get(e.getValue().getClass()); - if(c == null){ + if (c == null) { logger.warn("Unable to find constructor for storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName()); continue; } - try{ - plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue())); + try { + plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue())); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) { logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1); } } - } return plugins; } - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java index 4e7fb8f..2103a96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java @@ -57,8 +57,9 @@ public class WorkspaceConfig { @Override public boolean equals(Object obj) { - if (obj == this) + if (obj == this) { return true; + } if (obj == null || !(obj instanceof WorkspaceConfig)) { return false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index 4349fe0..03a6966 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -68,8 +68,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa private final PStore<String> knownViews; private final ObjectMapper mapper; - - public WorkspaceSchemaFactory(DrillConfig drillConfig, PStoreProvider provider, FileSystemPlugin plugin, String schemaName, String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config, List<FormatMatcher> formatMatchers) throws ExecutionSetupException, IOException { @@ -84,10 +82,10 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa this.schemaName = schemaName; // setup cache - if(storageEngineName == null){ + if (storageEngineName == null) { this.knownViews = null; // this.knownPaths = null; - }else{ + } else { this.knownViews = provider.getPStore(PStoreConfig // .newJacksonBuilder(drillConfig.getMapper(), String.class) // .name(Joiner.on('.').join("storage.views", storageEngineName, schemaName)) // @@ -109,7 +107,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa } - private Path getViewPath(String name){ + private Path getViewPath(String name) { return new Path(config.getLocation() + '/' + name + ".view.drill"); } @@ -122,14 +120,17 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa try { FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key); - if(fileSelection == null) return null; + if (fileSelection == null) { + return null; + } if (fileSelection.containsDirectories(fs)) { for (FormatMatcher m : dirMatchers) { try { Object selection = m.isReadable(fileSelection); - if (selection != null) + if (selection != null) { return new DynamicDrillTable(plugin, storageEngineName, selection); + } } catch (IOException e) { logger.debug("File read failed.", e); } @@ -139,8 +140,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa for (FormatMatcher m : fileMatchers) { Object selection = m.isReadable(fileSelection); - if (selection != null) + if (selection != null) { return new DynamicDrillTable(plugin, storageEngineName, selection); + } } return null; @@ -160,10 +162,12 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa public boolean createView(View view) throws Exception { Path viewPath = getViewPath(view.getName()); boolean replaced = fs.getUnderlying().exists(viewPath); - try(DrillOutputStream stream = fs.create(viewPath)){ + try (DrillOutputStream stream = fs.create(viewPath)) { mapper.writeValue(stream.getOuputStream(), view); } - if(knownViews != null) knownViews.put(view.getName(), viewPath.toString()); + if (knownViews != null) { + knownViews.put(view.getName(), viewPath.toString()); + } return replaced; } @@ -174,7 +178,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa public void dropView(String viewName) throws IOException { fs.getUnderlying().delete(getViewPath(viewName), false); - if(knownViews != null) knownViews.delete(viewName); + if (knownViews != null) { + knownViews.delete(viewName); + } } private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(WorkspaceSchemaFactory.this); @@ -230,7 +236,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa @Override public Table getTable(String name) { // first check existing tables. - if(tables.alreadyContainsKey(name)) return tables.get(name); + if(tables.alreadyContainsKey(name)) { + return tables.get(name); + } // then check known views. // String path = knownViews.get(name); @@ -239,8 +247,8 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa List<DotDrillFile> files; try { files = DotDrillUtil.getDotDrills(fs, new Path(config.getLocation()), name, DotDrillType.VIEW); - for(DotDrillFile f : files){ - switch(f.getType()){ + for(DotDrillFile f : files) { + switch(f.getType()) { case VIEW: return new DrillViewTable(schemaPath, getView(f)); } @@ -271,10 +279,11 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa public CreateTableEntry createNewTable(String tableName) { String storage = session.getOptions().getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val; FormatPlugin formatPlugin = plugin.getFormatPlugin(storage); - if (formatPlugin == null) + if (formatPlugin == null) { throw new UnsupportedOperationException( String.format("Unsupported format '%s' in workspace '%s'", config.getStorageFormat(), Joiner.on(".").join(getSchemaPath()))); + } return new FileSystemCreateTableEntry( (FileSystemConfig) plugin.getConfig(), http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 1341fa4..8efcd2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -123,7 +123,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public ScanStats getScanStats() { long data =0; - for(CompleteFileWork work : chunks){ + for (CompleteFileWork work : chunks) { data += work.getTotalBytes(); } @@ -137,13 +137,13 @@ public class EasyGroupScan extends AbstractGroupScan{ } @JsonProperty("columns") - public List<SchemaPath> getColumns(){ + public List<SchemaPath> getColumns() { return columns; } @JsonIgnore - public FileSelection getFileSelection(){ + public FileSelection getFileSelection() { return selection; } @@ -183,21 +183,21 @@ public class EasyGroupScan extends AbstractGroupScan{ return new EasySubScan(convert(filesForMinor), formatPlugin, columns, selectionRoot); } - private List<FileWorkImpl> convert(List<CompleteFileWork> list){ + private List<FileWorkImpl> convert(List<CompleteFileWork> list) { List<FileWorkImpl> newList = Lists.newArrayList(); - for(CompleteFileWork f : list){ + for (CompleteFileWork f : list) { newList.add(f.getAsFileWork()); } return newList; } @JsonProperty("storage") - public StoragePluginConfig getStorageConfig(){ + public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); } @JsonProperty("format") - public FormatPluginConfig getFormatConfig(){ + public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); } @@ -213,7 +213,9 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public GroupScan clone(List<SchemaPath> columns) { - if(!formatPlugin.supportsPushDown()) throw new IllegalStateException(String.format("%s doesn't support pushdown.", this.getClass().getSimpleName())); + if (!formatPlugin.supportsPushDown()) { + throw new IllegalStateException(String.format("%s doesn't support pushdown.", this.getClass().getSimpleName())); + } EasyGroupScan newScan = new EasyGroupScan(this); newScan.columns = columns; return newScan; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 8cd7cf2..e1165a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -90,12 +90,13 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } else if (obj == null) { return false; - if (getClass() == obj.getClass()) + } else if (getClass() == obj.getClass()) { return true; + } return false; } @@ -116,5 +117,4 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { return true; } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index ff70ccd..b64a032 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -117,15 +117,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } else if (obj == null) { return false; - if (!(obj instanceof TextFormatConfig)) + } else if (!(obj instanceof TextFormatConfig)) { return false; + } + TextFormatConfig that = (TextFormatConfig) obj; - if (this.delimiter.equals(that.delimiter)) + if (this.delimiter.equals(that.delimiter)) { return true; + } return false; } @@ -145,4 +148,5 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm public boolean supportsPushDown() { return true; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java index 21923d8..5736df8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java @@ -58,7 +58,7 @@ public class MockGroupScanPOP extends AbstractGroupScan { this.url = url; } - public ScanStats getScanStats(){ + public ScanStats getScanStats() { return ScanStats.TRIVIAL_TABLE; } @@ -83,7 +83,7 @@ public class MockGroupScanPOP extends AbstractGroupScan { this.records = records; this.types = types; int size = 0; - for(MockColumn dt : types){ + for (MockColumn dt : types) { size += TypeHelper.getSize(dt.getMajorType()); } this.recordSize = size; @@ -144,13 +144,19 @@ public class MockGroupScanPOP extends AbstractGroupScan { } @JsonIgnore - public MajorType getMajorType(){ + public MajorType getMajorType() { MajorType.Builder b = MajorType.newBuilder(); b.setMode(mode); b.setMinorType(minorType); - if(precision != null) b.setPrecision(precision); - if(width != null) b.setWidth(width); - if(scale != null) b.setScale(scale); + if (precision != null) { + b.setPrecision(precision); + } + if (width != null) { + b.setWidth(width); + } + if (scale != null) { + b.setScale(scale); + } return b.build(); } @@ -174,10 +180,12 @@ public class MockGroupScanPOP extends AbstractGroupScan { mappings = new LinkedList[endpoints.size()]; int i =0; - for(MockScanEntry e : this.getReadEntries()){ - if(i == endpoints.size()) i -= endpoints.size(); + for (MockScanEntry e : this.getReadEntries()) { + if (i == endpoints.size()) { + i -= endpoints.size(); + } LinkedList<MockScanEntry> entries = mappings[i]; - if(entries == null){ + if (entries == null) { entries = new LinkedList<MockScanEntry>(); mappings[i] = entries; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index 66851a9..43e6416 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -103,12 +103,14 @@ public class MockRecordReader extends AbstractRecordReader { @Override public int next() { - if(recordsRead >= this.config.getRecords()) return 0; + if (recordsRead >= this.config.getRecords()) { + return 0; + } int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead); recordsRead += recordSetSize; - for(ValueVector v : valueVectors){ + for (ValueVector v : valueVectors) { // logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName())); ValueVector.Mutator m = v.getMutator(); @@ -132,4 +134,5 @@ public class MockRecordReader extends AbstractRecordReader { @Override public void cleanup() { } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java index c566182..2f7ea18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java @@ -44,12 +44,18 @@ public class MockStorageEngineConfig extends StoragePluginConfigBase{ @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } MockStorageEngineConfig that = (MockStorageEngineConfig) o; - if (url != null ? !url.equals(that.url) : that.url != null) return false; + if (url != null ? !url.equals(that.url) : that.url != null) { + return false; + } return true; } @@ -58,4 +64,5 @@ public class MockStorageEngineConfig extends StoragePluginConfigBase{ public int hashCode() { return url != null ? url.hashCode() : 0; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a768fc9..86e5224 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -126,7 +126,9 @@ public class ParquetGroupScan extends AbstractGroupScan { @JsonProperty("selectionRoot") String selectionRoot // ) throws IOException, ExecutionSetupException { this.columns = columns; - if(formatConfig == null) formatConfig = new ParquetFormatConfig(); + if (formatConfig == null) { + formatConfig = new ParquetFormatConfig(); + } Preconditions.checkNotNull(storageConfig); Preconditions.checkNotNull(formatConfig); this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig); @@ -154,7 +156,7 @@ public class ParquetGroupScan extends AbstractGroupScan { this.fs = formatPlugin.getFileSystem().getUnderlying(); this.entries = Lists.newArrayList(); - for(FileStatus file : files){ + for (FileStatus file : files) { entries.add(new ReadEntryWithPath(file.getPath().toString())); } @@ -166,7 +168,7 @@ public class ParquetGroupScan extends AbstractGroupScan { /* * This is used to clone another copy of the group scan. */ - private ParquetGroupScan(ParquetGroupScan that){ + private ParquetGroupScan(ParquetGroupScan that) { this.columns = that.columns; this.endpointAffinities = that.endpointAffinities; this.entries = that.entries; @@ -182,7 +184,7 @@ public class ParquetGroupScan extends AbstractGroupScan { private void readFooterFromEntries() throws IOException { List<FileStatus> files = Lists.newArrayList(); - for(ReadEntryWithPath e : entries){ + for (ReadEntryWithPath e : entries) { files.add(fs.getFileStatus(new Path(e.getPath()))); } readFooter(files); @@ -299,7 +301,7 @@ public class ParquetGroupScan extends AbstractGroupScan { if (this.endpointAffinities == null) { BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits()); - try{ + try { for (RowGroupInfo rgi : rowGroupInfos) { EndpointByteMap ebm = bmb.getEndpointByteMap(rgi); rgi.setEndpointByteMap(ebm); @@ -318,7 +320,6 @@ public class ParquetGroupScan extends AbstractGroupScan { public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException { this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos); - } @Override @@ -335,9 +336,7 @@ public class ParquetGroupScan extends AbstractGroupScan { return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot); } - - - private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups){ + private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) { List<RowGroupReadEntry> entries = Lists.newArrayList(); for (RowGroupInfo rgi : rowGroups) { RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), @@ -347,7 +346,6 @@ public class ParquetGroupScan extends AbstractGroupScan { return entries; } - @Override public int getMaxParallelizationWidth() { return rowGroupInfos.size(); @@ -357,7 +355,6 @@ public class ParquetGroupScan extends AbstractGroupScan { return columns; } - @Override public ScanStats getScanStats() { int columnCount = columns == null ? 20 : columns.size(); @@ -403,4 +400,5 @@ public class ParquetGroupScan extends AbstractGroupScan { public long getColumnValueCount(SchemaPath column) { return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index b629dda..2424fac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -139,18 +139,21 @@ public abstract class ColumnReader<V extends ValueVector> { public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord) throws IOException { boolean doneReading = readPage(); - if (doneReading) + if (doneReading) { return true; + } doneReading = processPageData((int) recordsReadInCurrentPass); - if (doneReading) + if (doneReading) { return true; + } lengthVarFieldsInCurrentRecord += dataTypeLengthInBits; doneReading = checkVectorCapacityReached(); - if (doneReading) + if (doneReading) { return true; + } return false; } @@ -189,8 +192,9 @@ public abstract class ColumnReader<V extends ValueVector> { if (pageReader.currentPage == null || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) { readRecords(pageReader.valuesReadyToRead); - if (pageReader.currentPage != null) + if (pageReader.currentPage != null) { totalValuesRead += pageReader.currentPage.getValueCount(); + } if (!pageReader.next()) { hitRowGroupEnd(); return true; @@ -215,9 +219,10 @@ public abstract class ColumnReader<V extends ValueVector> { logger.debug("Reached the capacity of the data vector in a variable length value vector."); return true; } - else if (valuesReadInCurrentPass > valueVec.getValueCapacity()){ + else if (valuesReadInCurrentPass > valueVec.getValueCapacity()) { return true; } return false; } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index 2fc3d6e..0c4437a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -91,8 +91,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { pageReader.valuesReadyToRead += repeatedValuesInCurrentList; repeatedGroupsReadInCurrentPass++; currDictVal = null; - if ( ! notFishedReadingList) + if ( ! notFishedReadingList) { repeatedValuesInCurrentList = -1; + } } @Override @@ -112,8 +113,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { public void postPageRead() { super.postPageRead(); // this is no longer correct as we figured out that lists can reach across pages - if ( ! notFishedReadingList) + if ( ! notFishedReadingList) { repeatedValuesInCurrentList = -1; + } definitionLevelsRead = 0; } @@ -130,12 +132,14 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { @Override protected boolean checkVectorCapacityReached() { boolean doneReading = super.checkVectorCapacityReached(); - if (doneReading) + if (doneReading) { return true; - if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity()) + } + if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity()) { return true; - else + } else { return false; + } } @Override @@ -163,7 +167,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { definitionLevelsRead++; } int repLevel; - if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel){ + if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel) { if (repeatedValuesInCurrentList == -1 || notFishedReadingList) { repeatedValuesInCurrentList = 1; do { @@ -178,7 +182,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay // to add it to the read ) - if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()){ + if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()) { notFishedReadingList = true; // if we hit this case, we cut off the current batch at the previous value, these extra values as well // as those that spill into the next page will be added to the next batch @@ -188,8 +192,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { } } while (repLevel != 0); } - } - else { + } else { repeatedValuesInCurrentList = 0; } int currentValueListLength = repeatedValuesInCurrentList; @@ -209,7 +212,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { @Override protected void readRecords(int valuesToRead) { - if (valuesToRead == 0) return; + if (valuesToRead == 0) { + return; + } // TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into // a single vector dataReader.valuesReadInCurrentPass = 0; @@ -228,5 +233,5 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { super.clear(); dataReader.clear(); } -} +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/676f5df6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index 47d64bc..2e24674 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -69,7 +69,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten if ( currDefLevel == -1 ) { currDefLevel = pageReader.definitionLevels.readInteger(); } - if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel){ + if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel) { nullsRead++; // set length of zero, each index in the vector defaults to null so no need to set the nullability variableWidthVector.getMutator().setValueLengthSafe( @@ -93,14 +93,15 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten // I think this also needs to happen if it is null for the random access boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray, (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits); - if ( ! success ) + if ( ! success ) { return true; + } return false; } @Override public void updateReadyToReadPosition() { - if (! currentValNull){ + if (! currentValNull) { pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4; } pageReader.valuesReadyToRead++; @@ -109,7 +110,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten @Override public void updatePosition() { - if (! currentValNull){ + if (! currentValNull) { pageReader.readPosInBytes += dataTypeLengthInBits + 4; bytesReadInCurrentPass += dataTypeLengthInBits; } @@ -128,11 +129,12 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass); currentValNull = variableWidthVector.getAccessor().getObject(valuesReadInCurrentPass) == null; // again, I am re-purposing the unused field here, it is a length n BYTES, not bits - if (! currentValNull){ + if (! currentValNull) { boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray, (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits); assert success; } updatePosition(); } + }
