jpisaac commented on code in PR #2075: URL: https://github.com/apache/phoenix/pull/2075#discussion_r2004273738
########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; + +public class HAGroupStoreManager { + private static HAGroupStoreManager cacheInstance; + private final HAGroupStoreClient haGroupStoreClient; + private final boolean mutationBlockEnabled; + + /** + * Creates/gets an instance of HAGroupStoreManager. + * + * @param conf configuration + * @return cache + */ + public static HAGroupStoreManager getInstance(Configuration conf) throws Exception { + HAGroupStoreManager result = cacheInstance; Review Comment: Not sure if we need a local variable, Testing on the instance variable should be good enough ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; + +public class HAGroupStoreManager { + private static HAGroupStoreManager cacheInstance; Review Comment: nit: maybe good to call the variable name as storeMgrInstance or something? ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; + +public class HAGroupStoreManager { + private static HAGroupStoreManager cacheInstance; + private final HAGroupStoreClient haGroupStoreClient; + private final boolean mutationBlockEnabled; + + /** + * Creates/gets an instance of HAGroupStoreManager. + * + * @param conf configuration + * @return cache + */ + public static HAGroupStoreManager getInstance(Configuration conf) throws Exception { + HAGroupStoreManager result = cacheInstance; + if (result == null) { + synchronized (HAGroupStoreClient.class) { Review Comment: This should be synchronized on HAGroupStoreManager.class object ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; + +public class HAGroupStoreManager { + private static HAGroupStoreManager cacheInstance; Review Comment: Add a volatile keyword, that will ensure any multi-thread access are more safe. ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.INITIALIZED; + + +/** + * Write-through cache for HAGroupStore. + * Uses {@link PathChildrenCache} from {@link org.apache.curator.framework.CuratorFramework}. + */ +public class HAGroupStoreClient implements Closeable { + + private static final long HA_CACHE_INITIALIZATION_TIMEOUT_MS = 30000L; + private static HAGroupStoreClient cacheInstance; + private final PhoenixHAAdmin phoenixHaAdmin; + private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); + // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>> + private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap + = new ConcurrentHashMap<>(); + private final PathChildrenCache pathChildrenCache; + private boolean isHealthy; + + /** + * Creates/gets an instance of HAGroupStoreClient. + * + * @param conf configuration + * @return cache + */ + public static HAGroupStoreClient getInstance(Configuration conf) throws Exception { + HAGroupStoreClient result = cacheInstance; + if (result == null) { + synchronized (HAGroupStoreClient.class) { + result = cacheInstance; + if (result == null) { + cacheInstance = result = new HAGroupStoreClient(conf); + } + } + } + return result; + } + + private HAGroupStoreClient(final Configuration conf) throws Exception { + this.phoenixHaAdmin = new PhoenixHAAdmin(conf); + final PathChildrenCache pathChildrenCache = new PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true); + final CountDownLatch latch = new CountDownLatch(1); + pathChildrenCache.getListenable().addListener((client, event) -> { + LOGGER.info("HAGroupStoreClient PathChildrenCache Received event for type {}", event.getType()); + final ChildData childData = event.getData(); + ClusterRoleRecord eventCRR = extractCRROrNull(childData); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + if (eventCRR != null && eventCRR.getHaGroupName() != null) { + updateClusterRoleRecordMap(eventCRR); + } + break; + case CHILD_REMOVED: + // In case of CHILD_REMOVED, we get the old version of data that was just deleted. + if (eventCRR != null && eventCRR.getHaGroupName() != null + && !eventCRR.getHaGroupName().isEmpty() + && eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) { + final ClusterRoleRecord.ClusterRole role = eventCRR.getRole(phoenixHaAdmin.getZkUrl()); + clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>()); + clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName()); + } + break; + case INITIALIZED: + latch.countDown(); + break; + case CONNECTION_LOST: + case CONNECTION_SUSPENDED: + isHealthy = false; + break; + case CONNECTION_RECONNECTED: + isHealthy = true; + break; + default: + LOGGER.warn("Unexpected event type {}, complete event {}", event.getType(), event); + } + }); + pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + this.pathChildrenCache = pathChildrenCache; + isHealthy = latch.await(HA_CACHE_INITIALIZATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + buildClusterRoleToCRRMap(); + } + + private ClusterRoleRecord extractCRROrNull(final ChildData childData) { + if (childData != null) { + byte[] data = childData.getData(); + return ClusterRoleRecord.fromJson(data).orElse(null); + } + return null; + } + + private void updateClusterRoleRecordMap(final ClusterRoleRecord crr) { + ClusterRoleRecord.ClusterRole role = crr.getRole(phoenixHaAdmin.getZkUrl()); + if (role != null) { + clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>()); + clusterRoleToCRRMap.get(role).put(crr.getHaGroupName(), crr); + // Remove any pre-existing mapping with any other role for this HAGroupName + for(ClusterRoleRecord.ClusterRole mapRole : clusterRoleToCRRMap.keySet()) { + if (mapRole != role) { + ConcurrentHashMap<String, ClusterRoleRecord> roleWiseMap = clusterRoleToCRRMap.get(mapRole); + roleWiseMap.remove(crr.getHaGroupName()); Review Comment: It might be informative to add logging here, especially related to the versions being added and removed. We are piggybacking on the ZK guarantees of ordering of events. ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.INITIALIZED; + + +/** + * Write-through cache for HAGroupStore. + * Uses {@link PathChildrenCache} from {@link org.apache.curator.framework.CuratorFramework}. + */ +public class HAGroupStoreClient implements Closeable { + + private static final long HA_CACHE_INITIALIZATION_TIMEOUT_MS = 30000L; + private static HAGroupStoreClient cacheInstance; Review Comment: same comments here as before - volatile keyword and variable naming ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.INITIALIZED; + + +/** + * Write-through cache for HAGroupStore. + * Uses {@link PathChildrenCache} from {@link org.apache.curator.framework.CuratorFramework}. + */ +public class HAGroupStoreClient implements Closeable { + + private static final long HA_CACHE_INITIALIZATION_TIMEOUT_MS = 30000L; + private static HAGroupStoreClient cacheInstance; + private final PhoenixHAAdmin phoenixHaAdmin; + private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); + // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>> + private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap + = new ConcurrentHashMap<>(); + private final PathChildrenCache pathChildrenCache; + private boolean isHealthy; + + /** + * Creates/gets an instance of HAGroupStoreClient. + * + * @param conf configuration + * @return cache + */ + public static HAGroupStoreClient getInstance(Configuration conf) throws Exception { + HAGroupStoreClient result = cacheInstance; + if (result == null) { + synchronized (HAGroupStoreClient.class) { + result = cacheInstance; + if (result == null) { + cacheInstance = result = new HAGroupStoreClient(conf); + } + } + } + return result; + } + + private HAGroupStoreClient(final Configuration conf) throws Exception { + this.phoenixHaAdmin = new PhoenixHAAdmin(conf); + final PathChildrenCache pathChildrenCache = new PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true); Review Comment: Good to add multi-threading tests and zookeeper down tests and verify ZK event ordering ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.INITIALIZED; + + +/** + * Write-through cache for HAGroupStore. + * Uses {@link PathChildrenCache} from {@link org.apache.curator.framework.CuratorFramework}. + */ +public class HAGroupStoreClient implements Closeable { + + private static final long HA_CACHE_INITIALIZATION_TIMEOUT_MS = 30000L; + private static HAGroupStoreClient cacheInstance; + private final PhoenixHAAdmin phoenixHaAdmin; + private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); + // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>> + private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap + = new ConcurrentHashMap<>(); + private final PathChildrenCache pathChildrenCache; + private boolean isHealthy; + + /** + * Creates/gets an instance of HAGroupStoreClient. + * + * @param conf configuration + * @return cache + */ + public static HAGroupStoreClient getInstance(Configuration conf) throws Exception { + HAGroupStoreClient result = cacheInstance; + if (result == null) { + synchronized (HAGroupStoreClient.class) { + result = cacheInstance; + if (result == null) { + cacheInstance = result = new HAGroupStoreClient(conf); + } + } + } + return result; + } + + private HAGroupStoreClient(final Configuration conf) throws Exception { + this.phoenixHaAdmin = new PhoenixHAAdmin(conf); + final PathChildrenCache pathChildrenCache = new PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true); + final CountDownLatch latch = new CountDownLatch(1); + pathChildrenCache.getListenable().addListener((client, event) -> { + LOGGER.info("HAGroupStoreClient PathChildrenCache Received event for type {}", event.getType()); + final ChildData childData = event.getData(); + ClusterRoleRecord eventCRR = extractCRROrNull(childData); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + if (eventCRR != null && eventCRR.getHaGroupName() != null) { + updateClusterRoleRecordMap(eventCRR); + } + break; + case CHILD_REMOVED: + // In case of CHILD_REMOVED, we get the old version of data that was just deleted. + if (eventCRR != null && eventCRR.getHaGroupName() != null + && !eventCRR.getHaGroupName().isEmpty() + && eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) { + final ClusterRoleRecord.ClusterRole role = eventCRR.getRole(phoenixHaAdmin.getZkUrl()); Review Comment: Need to verify the getZkUrl() format is the same as the one in the CRR. Maybe have tests around it. We have had issues with ZOOKEEPER_QUORUM url formats before. FYI @lokiore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
