This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.21 in repository https://gitbox.apache.org/repos/asf/drill.git
commit 76b72bedd66097603e1f955c87b1322a3d3ce366 Author: James Turton <9107319+jntur...@users.noreply.github.com> AuthorDate: Wed Mar 29 17:11:39 2023 +0200 DRILL-8409: Support the configuration of bind addresses for network services (#2777) * Add support for setting RPC and HTTP bind addresses using boot options. * Safely log Parquet batch read timing. * Safely close page write store and column write store in ParquetRecordWriter. --- exec/java-exec/pom.xml | 2 +- .../java/org/apache/drill/exec/ExecConstants.java | 2 ++ .../drill/exec/rpc/control/ControllerImpl.java | 7 +++-- .../drill/exec/rpc/data/DataConnectionCreator.java | 10 ++++--- .../apache/drill/exec/server/rest/WebServer.java | 31 +++++++++++++++------- .../apache/drill/exec/service/ServiceEngine.java | 16 +++++++---- .../exec/store/parquet/ParquetRecordWriter.java | 13 +++++++-- .../parquet/columnreaders/ParquetRecordReader.java | 9 ++++++- .../hadoop/ParquetColumnChunkPageWriteStore.java | 3 ++- .../java-exec/src/main/resources/drill-module.conf | 4 ++- .../drill/exec/rpc/data/TestBitBitKerberos.java | 7 ++--- .../org/apache/drill/exec/rpc/data/TestBitRpc.java | 7 +++-- .../org/apache/drill/exec/rpc/BasicServer.java | 4 +-- 13 files changed, 83 insertions(+), 32 deletions(-) diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 69d8210901..8aa2d194cd 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -936,7 +936,7 @@ </execution> </executions> </plugin> - <plugin> <!-- generate the parser (Parser.jj is itself generated wit fmpp above) --> + <plugin> <!-- generate the parser (Parser.jj is itself generated with fmpp above) --> <groupId>org.codehaus.mojo</groupId> <artifactId>javacc-maven-plugin</artifactId> <version>2.6</version> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 9b32005826..8fd01be1ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -59,6 +59,7 @@ public final class ExecConstants { public static final String BIT_RETRY_DELAY = "drill.exec.rpc.bit.server.retry.delay"; public static final String BIT_TIMEOUT = "drill.exec.bit.timeout"; public static final String SERVICE_NAME = "drill.exec.cluster-id"; + public static final String RPC_BIND_ADDR = "drill.exec.rpc.bind_addr"; public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port"; public static final String INITIAL_DATA_PORT = "drill.exec.rpc.bit.server.dataport"; public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout"; @@ -222,6 +223,7 @@ public final class ExecConstants { public static final String HTTP_ENABLE = "drill.exec.http.enabled"; public static final String HTTP_MAX_PROFILES = "drill.exec.http.max_profiles"; public static final String HTTP_PROFILES_PER_PAGE = "drill.exec.http.profiles_per_page"; + public static final String HTTP_BIND_ADDR = "drill.exec.http.bind_addr"; public static final String HTTP_PORT = "drill.exec.http.port"; public static final String HTTP_PORT_HUNT = "drill.exec.http.porthunt"; public static final String HTTP_JETTY_SERVER_DUMP_AFTER_START = "drill.exec.http.jetty.server.dumpAfterStart"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index 0721557b7f..d6a8108b92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.control; import java.util.List; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; @@ -56,8 +57,10 @@ public class ControllerImpl implements Controller { @Override public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, final boolean allowPortHunting) { server = new ControlServer(config, connectionRegistry); - int port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_BIT_PORT); - port = server.bind(port, allowPortHunting); + DrillConfig drillConfig = config.getBootstrapContext().getConfig(); + String bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR); + int port = drillConfig.getInt(ExecConstants.INITIAL_BIT_PORT); + port = server.bind(bindAddr, port, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build(); connectionRegistry.setLocalEndpoint(completeEndpoint); handlerRegistry.setEndpoint(completeEndpoint); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java index 6265012f22..995535dd32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.data; import java.util.concurrent.ConcurrentMap; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; @@ -52,10 +53,13 @@ public class DataConnectionCreator implements AutoCloseable { public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) { server = new DataServer(config); int port = partialEndpoint.getControlPort() + 1; - if (config.getBootstrapContext().getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) { - port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_DATA_PORT); + DrillConfig drillConfig = config.getBootstrapContext().getConfig(); + + String bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR); + if (drillConfig.hasPath(ExecConstants.INITIAL_DATA_PORT)) { + port = drillConfig.getInt(ExecConstants.INITIAL_DATA_PORT); } - port = server.bind(port, allowPortHunting); + port = server.bind(bindAddr, port, allowPortHunting); return partialEndpoint.toBuilder().setDataPort(port).build(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java index 673976d8b2..e6f1499726 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java @@ -156,8 +156,9 @@ public class WebServer implements AutoCloseable { final int acceptors = config.getInt(ExecConstants.HTTP_JETTY_SERVER_ACCEPTORS); final int selectors = config.getInt(ExecConstants.HTTP_JETTY_SERVER_SELECTORS); + String bindAddr = config.getString(ExecConstants.HTTP_BIND_ADDR); int port = config.getInt(ExecConstants.HTTP_PORT); - ServerConnector connector = createConnector(port, acceptors, selectors); + ServerConnector connector = createConnector(bindAddr, port, acceptors, selectors); final int handlers = config.getInt(ExecConstants.HTTP_JETTY_SERVER_HANDLERS); threadPool.setMaxThreads(handlers + connector.getAcceptors() + connector.getSelectorManager().getSelectorCount()); @@ -310,7 +311,7 @@ public class WebServer implements AutoCloseable { public int getPort() { if (!isRunning()) { - throw new UnsupportedOperationException("Http is not enabled"); + throw new UnsupportedOperationException("HTTP server is not enabled"); } return ((ServerConnector)embeddedJetty.getConnectors()[0]).getPort(); } @@ -319,16 +320,21 @@ public class WebServer implements AutoCloseable { return (embeddedJetty != null && embeddedJetty.getConnectors().length == 1); } - private ServerConnector createConnector(int port, int acceptors, int selectors) throws Exception { + private ServerConnector createConnector( + String bindAddr, + int port, + int acceptors, + int selectors + ) throws Exception { final ServerConnector serverConnector; if (config.getBoolean(ExecConstants.HTTP_ENABLE_SSL)) { try { - serverConnector = createHttpsConnector(port, acceptors, selectors); + serverConnector = createHttpsConnector(bindAddr, port, acceptors, selectors); } catch (DrillException e) { throw new DrillbitStartupException(e.getMessage(), e); } } else { - serverConnector = createHttpConnector(port, acceptors, selectors); + serverConnector = createHttpConnector(bindAddr, port, acceptors, selectors); } return serverConnector; @@ -341,8 +347,13 @@ public class WebServer implements AutoCloseable { * * @return Initialized {@link ServerConnector} for HTTPS connections. */ - private ServerConnector createHttpsConnector(int port, int acceptors, int selectors) throws Exception { - logger.info("Setting up HTTPS connector for web server"); + private ServerConnector createHttpsConnector( + String bindAddr, + int port, + int acceptors, + int selectors + ) throws Exception { + logger.info("Setting up HTTPS connector for web server at {}:{}", bindAddr, port); SslContextFactory sslContextFactory = new SslContextFactoryConfigurator(config, workManager.getContext().getEndpoint().getAddress()) .configureNewSslContextFactory(); @@ -354,6 +365,7 @@ public class WebServer implements AutoCloseable { null, null, null, acceptors, selectors, new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()), new HttpConnectionFactory(httpsConfig)); + sslConnector.setHost(bindAddr); sslConnector.setPort(port); return sslConnector; @@ -364,10 +376,11 @@ public class WebServer implements AutoCloseable { * * @return Initialized {@link ServerConnector} instance for HTTP connections. */ - private ServerConnector createHttpConnector(int port, int acceptors, int selectors) { - logger.info("Setting up HTTP connector for web server"); + private ServerConnector createHttpConnector(String bindAddr, int port, int acceptors, int selectors) { + logger.info("Setting up HTTP connector for web server at {}:{}", bindAddr, port); final ServerConnector httpConnector = new ServerConnector(embeddedJetty, null, null, null, acceptors, selectors, new HttpConnectionFactory(baseHttpConfig())); + httpConnector.setHost(bindAddr); httpConnector.setPort(port); return httpConnector; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index 6ae6f44f4b..06daf6a653 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; @@ -53,6 +54,7 @@ public class ServiceEngine implements AutoCloseable { private final DataConnectionCreator dataPool; private final String hostName; + private final String bindAddr; private final int intialUserPort; private final boolean allowPortHunting; private final boolean isDistributedMode; @@ -73,14 +75,16 @@ public class ServiceEngine implements AutoCloseable { dataAllocator = newAllocator(context, "rpc:bit-data", "drill.exec.rpc.bit.server.memory.data.reservation", "drill.exec.rpc.bit.server.memory.data.maximum"); + DrillConfig drillConfig = context.getConfig(); final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup( - context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-"); + drillConfig.getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-"); userServer = new UserServer(context, userAllocator, eventLoopGroup, manager.getUserWorker()); controller = new ControllerImpl(context, controlAllocator, manager.getControlMessageHandler()); dataPool = new DataConnectionCreator(context, dataAllocator, manager.getWorkBus(), manager.getBee()); hostName = context.getHostName(); - intialUserPort = context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT); + bindAddr = drillConfig.getString(ExecConstants.RPC_BIND_ADDR); + intialUserPort = drillConfig.getInt(ExecConstants.INITIAL_USER_PORT); this.allowPortHunting = allowPortHunting; this.isDistributedMode = isDistributedMode; } @@ -93,11 +97,13 @@ public class ServiceEngine implements AutoCloseable { public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException { // loopback address check - if (isDistributedMode && InetAddress.getByName(hostName).isLoopbackAddress()) { - throw new DrillbitStartupException("Drillbit is disallowed to bind to loopback address in distributed mode."); + if (isDistributedMode && InetAddress.getByName(bindAddr).isLoopbackAddress()) { + throw new DrillbitStartupException( + "Drillbit may not bind to a loopback address in distributed mode." + ); } - userPort = userServer.bind(intialUserPort, allowPortHunting); + userPort = userServer.bind(bindAddr, intialUserPort, allowPortHunting); DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder() .setAddress(hostName) .setUserPort(userPort) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index d58747c711..c57a7c2e9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -30,6 +30,7 @@ import java.util.NoSuchElementException; import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; @@ -491,8 +492,16 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { flushParquetFileWriter(); } } finally { - store.close(); - pageStore.close(); + AutoCloseables.closeSilently(pageStore); + + // ColumnWriteStore doesn't implement AutoCloseable so a manual safe + // closure must be written. + try { + store.close(); + } catch (Exception e) { + logger.warn("Error closing {}", store, e); + } + codecFactory.release(); store = null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index b9c365cace..1a8e6f27a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -231,7 +231,14 @@ public class ParquetRecordReader extends CommonParquetRecordReader { "\nRow group index: " + rowGroupIndex + "\nRecords to read: " + numRecordsToRead, e); } finally { - parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); + if (parquetReaderStats != null) { + parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS)); + } else { + logger.warn( + "Cannot log batch read timing because no Parquet reader stats tracker " + + "is available (probably due to an earlier error during query execution)." + ); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java index 31a304e8bd..28dfc27859 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java @@ -57,7 +57,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @InterfaceAudience.Private -public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, Closeable { +public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, +AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class); private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 749bd11b64..488d8af088 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -104,7 +104,8 @@ drill.exec: { } } }, - use.ip: false + use.ip: false, + bind_addr: "0.0.0.0" }, optimizer: { implementation: "org.apache.drill.exec.opt.IdentityOptimizer" @@ -154,6 +155,7 @@ drill.exec: { }, ssl_enabled: false, porthunt: false, + bind_addr: "0.0.0.0", port: 8047, jetty : { server : { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java index 65b0351dce..a00401c9f7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java @@ -81,6 +81,7 @@ import static org.mockito.Mockito.when; @Category(SecurityTest.class) public class TestBitBitKerberos extends ClusterTest { private static KerberosHelper krbHelper; + private static final String BIND_ADDR = "127.0.0.1"; private int port = 1234; @@ -172,7 +173,7 @@ public class TestBitBitKerberos extends ClusterTest { new DataServerRequestHandler(workBus, bee)); DataServer server = new DataServer(config); - port = server.bind(port, true); + port = server.bind(BIND_ADDR, port, true); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); DataConnectionManager connectionManager = new DataConnectionManager(ep, config); DataTunnel tunnel = new DataTunnel(connectionManager); @@ -218,7 +219,7 @@ public class TestBitBitKerberos extends ClusterTest { new DataConnectionConfig(c2.getAllocator(), c2, new DataServerRequestHandler(workBus, bee)); final DataServer server = new DataServer(config); - port = server.bind(port, true); + port = server.bind(BIND_ADDR, port, true); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); final DataConnectionManager connectionManager = new DataConnectionManager(ep, config); final DataTunnel tunnel = new DataTunnel(connectionManager); @@ -265,7 +266,7 @@ public class TestBitBitKerberos extends ClusterTest { new DataServerRequestHandler(workBus, bee)); final DataServer server = new DataServer(config); - port = server.bind(port, true); + port = server.bind(BIND_ADDR, port, true); final DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); final DataConnectionManager connectionManager = new DataConnectionManager(ep, config); final DataTunnel tunnel = new DataTunnel(connectionManager); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java index 20a734055b..4b145dfb04 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java @@ -61,6 +61,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestBitRpc extends ExecTest { + + private static final String BIND_ADDR = "127.0.0.1"; + @Test public void testConnectionBackpressure() throws Exception { final WorkerBee bee = mock(WorkerBee.class); @@ -80,7 +83,7 @@ public class TestBitRpc extends ExecTest { new DataServerRequestHandler(workBus, bee)); DataServer server = new DataServer(config); - port = server.bind(port, true); + port = server.bind(BIND_ADDR, port, true); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); DataConnectionManager manager = new DataConnectionManager(ep, config); DataTunnel tunnel = new DataTunnel(manager); @@ -113,7 +116,7 @@ public class TestBitRpc extends ExecTest { new DataServerRequestHandler(workBus, bee)); DataServer server = new DataServer(config); - port = server.bind(port, true); + port = server.bind(BIND_ADDR, port, true); DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); DataConnectionManager manager = new DataConnectionManager(ep, config); DataTunnel tunnel = new DataTunnel(manager); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index 5a6bed838e..2870d6672f 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -189,11 +189,11 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio return null; } - public int bind(final int initialPort, boolean allowPortHunting) { + public int bind(String bindAddr, final int initialPort, boolean allowPortHunting) { int port = initialPort - 1; while (true) { try { - b.bind(++port).sync(); + b.bind(bindAddr, ++port).sync(); break; } catch (Exception e) { // TODO(DRILL-3026): Revisit: Exception is not (always) BindException.