This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch use-zk-cache-base-data-accessor-to-get-instance-config in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9c239e9c580d7d96c6dc12c63866baf6d5ebbad2 Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Wed Jan 2 16:20:19 2019 -0800 Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager --- .../helix/core/PinotHelixResourceManager.java | 53 ++++++++++++++++++---- .../controller/helix/PinotResourceManagerTest.java | 10 ++++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java index 423db8c..f3e936f 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -84,6 +84,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.configuration.Configuration; import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.ClusterMessagingService; import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; @@ -93,7 +94,10 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -128,6 +132,7 @@ public class PinotHelixResourceManager { private HelixAdmin _helixAdmin; private ZkHelixPropertyStore<ZNRecord> _propertyStore; private HelixDataAccessor _helixDataAccessor; + private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor; private Builder _keyBuilder; private SegmentDeletionManager _segmentDeletionManager; private TableRebalancer _tableRebalancer; @@ -168,6 +173,16 @@ public class PinotHelixResourceManager { _helixAdmin = _helixZkManager.getClusterManagmentTool(); _propertyStore = _helixZkManager.getHelixPropertyStore(); _helixDataAccessor = _helixZkManager.getHelixDataAccessor(); + // Cache instance zk paths. + BaseDataAccessor<ZNRecord> baseDataAccessor = _helixDataAccessor.getBaseDataAccessor(); + if (baseDataAccessor instanceof ZkBaseDataAccessor) { + String instanceConfigs = PropertyPathBuilder.instanceConfig(_helixClusterName); + _cacheInstanceConfigsDataAccessor = + new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs, + Collections.singletonList(instanceConfigs), Collections.singletonList(instanceConfigs)); + } else { + _cacheInstanceConfigsDataAccessor = null; + } _keyBuilder = _helixDataAccessor.keyBuilder(); _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore); ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster); @@ -261,6 +276,10 @@ public class PinotHelixResourceManager { return _keyBuilder; } + public ZkCacheBaseDataAccessor<ZNRecord> getCacheInstanceConfigsDataAccessor() { + return _cacheInstanceConfigsDataAccessor; + } + /** * Returns the config for all the Helix instances in the cluster. */ @@ -276,6 +295,11 @@ public class PinotHelixResourceManager { */ @Nonnull public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) { + if (_cacheInstanceConfigsDataAccessor != null) { + LOGGER.info("Get instance config for instance {} from cluster {}.", instanceId, _helixClusterName); + ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" + instanceId, null, AccessOption.PERSISTENT); + return new InstanceConfig(znRecord); + } return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId); } @@ -1519,7 +1543,7 @@ public class PinotHelixResourceManager { ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName, - rawTableName, refreshMessage, recipientCriteria); + rawTableName, refreshMessage, recipientCriteria); // Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message. int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs); if (nMsgsSent > 0) { @@ -1529,7 +1553,7 @@ public class PinotHelixResourceManager { // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get // the latest time boundary info. LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName, - offlineTableName, nMsgsSent); + offlineTableName, nMsgsSent); } } @@ -2189,28 +2213,37 @@ public class PinotHelixResourceManager { /* * Uncomment and use for testing on a real cluster + * */ public static void main(String[] args) throws Exception { final String testZk = "test1.zk.com:12345/pinot-cluster"; - final String realZk = "test2.zk.com:12345/pinot-cluster"; + final String realZk = "zk-lca1-pinot.stg.linkedin.com:12913/pinot-cluster"; final String zkURL = realZk; - final String clusterName = "mpSprintDemoCluster"; + final String clusterName = "pinot"; final String helixClusterName = clusterName; final String controllerInstanceId = "local-hostname"; final String localDiskDir = "/var/tmp/Controller"; final long externalViewOnlineToOfflineTimeoutMillis = 100L; final boolean isSingleTenantCluster = false; final boolean isUpdateStateModel = false; - MetricsRegistry metricsRegistry = new MetricsRegistry(); +// MetricsRegistry metricsRegistry = new MetricsRegistry(); final boolean dryRun = true; final String tableName = "testTable"; final TableType tableType = TableType.OFFLINE; PinotHelixResourceManager helixResourceManager = new PinotHelixResourceManager(zkURL, helixClusterName, controllerInstanceId, localDiskDir, - externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel); + externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel, false); helixResourceManager.start(); - ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType); - ObjectMapper mapper = new ObjectMapper(); - System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record)); + + Set<String> instances = new HashSet<>(); + instances.add("Server_lca1-app0168.stg.linkedin.com_8001"); + instances.add("Server_lca1-app0170.stg.linkedin.com_8001"); + BiMap<String, String> map = helixResourceManager.getDataInstanceAdminEndpoints(instances); + for (Map.Entry<String, String> entry : map.entrySet()) { + System.out.println(entry.getKey() + " " + entry.getValue()); + } + System.out.println("~~~"); +// ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType); +// ObjectMapper mapper = new ObjectMapper(); +// System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record)); } - */ } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java index e3c59d4..08c7bc6 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java @@ -26,9 +26,12 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; +import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -66,6 +69,13 @@ public class PinotResourceManagerTest { Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(), 1); + // Verifying instance configs. + ZNRecord znRecord = _pinotHelixResourceManager.getCacheInstanceConfigsDataAccessor().get("/Server_localhost_0", null, AccessOption.PERSISTENT); + Assert.assertNotNull(znRecord); + Assert.assertEquals("Server_localhost_0", znRecord.getId()); + InstanceConfig instanceConfig = new InstanceConfig(znRecord); + Assert.assertEquals("Server_localhost", instanceConfig.getHostName()); + // Adding table TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
