This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.20 in repository https://gitbox.apache.org/repos/asf/drill.git
commit 64c4f25a1926c9f45ef5221710e8e760c05e0782 Author: luoc <l...@apache.org> AuthorDate: Mon Sep 5 13:48:11 2022 +0800 DRILL-8275: Prevent the JDBC Client from creating spurious paths in Zookeeper (#2617) --- .../drill/exec/coord/zk/ZKClusterCoordinator.java | 36 +++++++++++++----- .../org/apache/drill/exec/server/Drillbit.java | 3 +- .../drill/exec/client/DrillClientStateTest.java | 43 ++++++++++++++++++++++ 3 files changed, 71 insertions(+), 11 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java index aeb0dd0f20..108ca770c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,6 +33,7 @@ import java.util.regex.Pattern; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.drill.shaded.guava.com.google.common.base.Throwables; +import org.apache.zookeeper.data.Stat; import org.apache.commons.collections.keyvalue.MultiKey; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -40,6 +42,7 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.ServiceCache; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; @@ -78,15 +81,17 @@ public class ZKClusterCoordinator extends ClusterCoordinator { private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>(); private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$"); - public ZKClusterCoordinator(DrillConfig config, String connect) { - this(config, connect, new DefaultACLProvider()); + public ZKClusterCoordinator(DrillConfig config, String connect) throws Exception { + // As a Client, the namespace (aka zkRoot) should not be created. + this(config, connect, false, new DefaultACLProvider()); } - public ZKClusterCoordinator(DrillConfig config, ACLProvider aclProvider) { - this(config, null, aclProvider); + public ZKClusterCoordinator(DrillConfig config, ACLProvider aclProvider) throws Exception { + // As a Drillbit, it's required to create the zkRoot. + this(config, null, true, aclProvider); } - public ZKClusterCoordinator(DrillConfig config, String connect, ACLProvider aclProvider) { + public ZKClusterCoordinator(DrillConfig config, String connect, boolean createNamespace, ACLProvider aclProvider) throws Exception { connect = connect == null || connect.isEmpty() ? config.getString(ExecConstants.ZK_CONNECTION) : connect; String clusterId = config.getString(ExecConstants.SERVICE_NAME); @@ -106,13 +111,23 @@ public class ZKClusterCoordinator extends ClusterCoordinator { RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES), config.getInt(ExecConstants.ZK_RETRY_DELAY)); - curator = CuratorFrameworkFactory.builder() - .namespace(zkRoot) + + CuratorFrameworkFactory.Builder curatorBuilder = CuratorFrameworkFactory.builder() .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT)) .retryPolicy(rp) .connectString(connect) - .aclProvider(aclProvider) - .build(); + .aclProvider(aclProvider); + + if (!createNamespace) { // Using ZK style in client, the root path should exist. + try (CuratorFramework client = curatorBuilder.build()) { + client.start(); + Stat stat = client.checkExists().forPath(ZKPaths.PATH_SEPARATOR + zkRoot); + Objects.requireNonNull(stat, "The root path does not exist in the Zookeeper."); + } + } + + curator = curatorBuilder.namespace(zkRoot).build(); + curator.getConnectionStateListenable().addListener(new InitialConnectionListener()); curator.start(); discovery = newDiscovery(); @@ -219,6 +234,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { * triggered. State information is used during planning and * initial client connection phases. */ + @Override public RegistrationHandle update(RegistrationHandle handle, State state) { ZKRegistrationHandle h = (ZKRegistrationHandle) handle; try { @@ -348,7 +364,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() { return ServiceDiscoveryBuilder .builder(DrillbitEndpoint.class) - .basePath("/") + .basePath(ZKPaths.PATH_SEPARATOR) .client(curator) .serializer(DrillServiceInstanceHelper.SERIALIZER) .build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 3bf20e04e7..b0cb47dde3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.server; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.utils.ZKPaths; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.StackTrace; import org.apache.drill.common.concurrent.ExtendedLatch; @@ -179,7 +180,7 @@ public class Drillbit implements AutoCloseable { } else { String clusterId = config.getString(ExecConstants.SERVICE_NAME); String zkRoot = config.getString(ExecConstants.ZK_ROOT); - String drillClusterPath = "/" + zkRoot + "/" + clusterId; + String drillClusterPath = ZKPaths.PATH_SEPARATOR + zkRoot + ZKPaths.PATH_SEPARATOR + clusterId; ACLProvider aclProvider = ZKACLProviderFactory.getACLProvider(config, drillClusterPath, context); coord = new ZKClusterCoordinator(config, aclProvider); storeProvider = new PersistentStoreRegistry<>(this.coord, config).newPStoreProvider(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java new file mode 100644 index 0000000000..0a25e6e8d4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientStateTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.client; + +import org.apache.curator.utils.ZKPaths; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.TestWithZookeeper; +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; +import org.junit.Test; + +public class DrillClientStateTest extends TestWithZookeeper { + + @Test + public void testNotExistZkRoot() throws Exception { + // There is no drillbit startup, therefore the root path is not saved in ZK. + thrownException.expect(NullPointerException.class); + thrownException.expectMessage("root path does not exist"); + DrillConfig config = DrillConfig.create(); + String connString = zkHelper.getConnectionString(); + String zkRoot = config.getString(ExecConstants.ZK_ROOT); // does not exist + connString = connString + ZKPaths.PATH_SEPARATOR + zkRoot; + try (ZKClusterCoordinator coordinator = new ZKClusterCoordinator(config, connString)) { + coordinator.start(10000); + } + } + +}