This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit cd821cd620b6821cbe3d4ccf7072efeb3b924d32 Author: Hunter Lee <[email protected]> AuthorDate: Thu May 9 15:59:59 2019 -0700 Fix critical Task Framework throttle bug Task throttling feature had a logical bug where it wouldn't count any of the pending task assignments, which was breaking task throttling. This diff fixes it. RB=1661127 BUG=HELIX-1875 G=helix-reviewers A=jjwang Signed-off-by: Hunter Lee <[email protected]> --- .../org/apache/helix/ClusterMessagingService.java | 17 +++---- .../stages/CurrentStateComputationStage.java | 5 ++ .../controller/stages/CurrentStateOutput.java | 35 +++++++++----- .../integration/manager/TestZkHelixAdmin.java | 44 +++++++++++++++++- .../spectator/TestRoutingTableSnapshot.java | 3 +- .../helix/manager/zk/TestZNRecordSizeLimit.java | 53 +++++++++++----------- .../TestClusterStatusMonitorLifecycle.java | 49 ++++---------------- 7 files changed, 117 insertions(+), 89 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java index 28188e0..96a5957 100644 --- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java @@ -37,7 +37,7 @@ import org.apache.helix.model.Message; public interface ClusterMessagingService { /** * Send message matching the specifications mentioned in recipientCriteria. - * @param recipientCriteria criteria to be met, defined as {@link Criteria} + * @param receipientCriteria criteria to be met, defined as {@link Criteria} * @See Criteria * @param message * message to be sent. Some attributes of this message will be @@ -55,24 +55,24 @@ public interface ClusterMessagingService { * This is useful when message need to be sent and current thread need not * wait for response since processing will be done in another thread. * @see #send(Criteria, Message) - * @param recipientCriteria + * @param receipientCriteria * @param message * @param callbackOnReply callback to trigger on completion * @param timeOut Time to wait before failing the send * @return the number of messages that were successfully sent */ - int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut); + int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut); /** * @see #send(Criteria, Message, AsyncCallback, int) - * @param recipientCriteria + * @param receipientCriteria * @param message * @param callbackOnReply * @param timeOut * @param retryCount maximum number of times to retry the send * @return the number of messages that were successfully sent */ - int send(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply, + int send(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut, int retryCount); /** @@ -86,13 +86,14 @@ public interface ClusterMessagingService { * The current thread can use callbackOnReply instance to store application * specific data. * @see #send(Criteria, Message, AsyncCallback, int) - * @param recipientCriteria + * @param receipientCriteria * @param message * @param callbackOnReply * @param timeOut + * @param retryCount * @return the number of messages that were successfully sent */ - int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback callbackOnReply, + int sendAndWait(Criteria receipientCriteria, Message message, AsyncCallback callbackOnReply, int timeOut); /** @@ -143,7 +144,7 @@ public interface ClusterMessagingService { /** * This will generate all messages to be sent given the recipientCriteria and MessageTemplate, * the messages are not sent. - * @param recipientCriteria criteria to be met, defined as {@link Criteria} + * @param receipientCriteria criteria to be met, defined as {@link Criteria} * @param messageTemplate the Message on which to base the messages to send * @return messages to be sent, grouped by the type of instance to send the message to */ diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 72d3688..0bf4d28 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -121,6 +121,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage { } } } + + // Add the state model into the map for lookup of Task Framework pending partitions + if (resource.getStateModelDefRef() != null) { + currentStateOutput.setResourceStateModelDef(resourceName, resource.getStateModelDefRef()); + } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java index b634703..13e1dbf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java @@ -110,8 +110,8 @@ public class CurrentStateOutput { _currentStateMap.get(resourceName).get(partition).put(instanceName, state); } - public void setEndTime(String resourceName, Partition partition, - String instanceName, Long timestamp) { + public void setEndTime(String resourceName, Partition partition, String instanceName, + Long timestamp) { if (!_currentStateEndTimeMap.containsKey(resourceName)) { _currentStateEndTimeMap.put(resourceName, new HashMap<Partition, Map<String, Long>>()); } @@ -193,8 +193,7 @@ public class CurrentStateOutput { return null; } - public Long getEndTime(String resourceName, Partition partition, - String instanceName) { + public Long getEndTime(String resourceName, Partition partition, String instanceName) { Map<Partition, Map<String, Long>> partitionInfo = _currentStateEndTimeMap.get(resourceName); if (partitionInfo != null) { Map<String, Long> instanceInfo = partitionInfo.get(partition); @@ -279,7 +278,7 @@ public class CurrentStateOutput { */ public Map<Partition, Map<String, String>> getCurrentStateMap(String resourceName) { if (_currentStateMap.containsKey(resourceName)) { - return _currentStateMap.get(resourceName); + return _currentStateMap.get(resourceName); } return Collections.emptyMap(); } @@ -356,32 +355,43 @@ public class CurrentStateOutput { } /** - * Get the partitions count for each participant with the pending state and given resource state model + * Get the partitions count for each participant with the pending state and given resource state + * model * @param resourceStateModel specified resource state model to look up * @param state specified pending resource state to look up * @return set of participants to partitions mapping */ - public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, + String state) { return getPartitionCountWithState(resourceStateModel, state, (Map) _pendingMessageMap); } /** - * Get the partitions count for each participant in the current state and with given resource state model + * Get the partitions count for each participant in the current state and with given resource + * state model * @param resourceStateModel specified resource state model to look up * @param state specified current resource state to look up * @return set of participants to partitions mapping */ - public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, + String state) { return getPartitionCountWithState(resourceStateModel, state, (Map) _currentStateMap); } + /** + * Count partitions in pendingStates and currentStates. + * @param resourceStateModel + * @param state + * @param stateMap + * @return + */ private Map<String, Integer> getPartitionCountWithState(String resourceStateModel, String state, Map<String, Map<Partition, Map<String, Object>>> stateMap) { Map<String, Integer> currentPartitionCount = new HashMap<>(); for (String resource : stateMap.keySet()) { String stateModel = _resourceStateModelMap.get(resource); - if ((stateModel != null && stateModel.equals(resourceStateModel)) || (stateModel == null - && resourceStateModel == null)) { + if ((stateModel != null && stateModel.equals(resourceStateModel)) + || (stateModel == null && resourceStateModel == null)) { for (Partition partition : stateMap.get(resource).keySet()) { Map<String, Object> partitionMessage = stateMap.get(resource).get(partition); for (Map.Entry<String, Object> participantMap : partitionMessage.entrySet()) { @@ -399,7 +409,8 @@ public class CurrentStateOutput { currState = curStateObj.toString(); } } - if ((currState != null && currState.equals(state)) || (currState == null && state == null)) { + if ((currState != null && currState.equals(state)) + || (currState == null && state == null)) { currentPartitionCount.put(participant, currentPartitionCount.get(participant) + 1); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java index 5141a8d..0dfdfb4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java @@ -19,16 +19,20 @@ package org.apache.helix.integration.manager; * under the License. */ +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; +import org.apache.helix.api.config.ViewClusterSourceConfig; import org.apache.helix.integration.task.MockTask; import org.apache.helix.integration.task.TaskTestBase; import org.apache.helix.integration.task.WorkflowGenerator; import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; @@ -57,6 +61,26 @@ public class TestZkHelixAdmin extends TaskTestBase { } @Test + public void testViewClusterOperations() { + String testCluster = "testViewCluster"; + List<ViewClusterSourceConfig> sourceConfigs = generateViewClusterSourceConfig(); + int refreshPeriod = 10; + + _admin.addCluster(testCluster); + ClusterConfig config = _configAccessor.getClusterConfig(testCluster); + config.setViewCluster(); + config.setViewClusterRefreshPeriod(refreshPeriod); + config.setViewClusterSourceConfigs(sourceConfigs); + _configAccessor.setClusterConfig(testCluster, config); + + ClusterConfig fetchedConfig = _configAccessor.getClusterConfig(testCluster); + Assert.assertTrue(fetchedConfig.isViewCluster()); + Assert.assertEquals(fetchedConfig.getViewClusterSourceConfigs().size(), sourceConfigs.size()); + Assert.assertEquals(fetchedConfig.getViewClusterRefershPeriod(), refreshPeriod); + _admin.dropCluster(testCluster); + } + + @Test public void testEnableDisablePartitions() throws InterruptedException { _admin.enablePartition(false, CLUSTER_NAME, (PARTICIPANT_PREFIX + "_" + _startPort), WorkflowGenerator.DEFAULT_TGT_DB, Arrays.asList(new String[] { "TestDB_0", "TestDB_2" })); @@ -88,4 +112,22 @@ public class TestZkHelixAdmin extends TaskTestBase { Assert.assertEquals(jobContext.getPartitionState(1), TaskPartitionState.COMPLETED); Assert.assertEquals(jobContext.getPartitionState(2), null); } -} \ No newline at end of file + + private List<ViewClusterSourceConfig> generateViewClusterSourceConfig() { + String clusterNamePrefix = "mySourceCluster"; + String zkConnection = "zookeeper.test.com:2121"; + String testJsonTemplate = + "{\"name\": \"%s\", \"zkAddress\": \"%s\", \"properties\": [\"%s\", \"%s\", \"%s\"]}"; + + List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String clusterName = clusterNamePrefix + i; + String configJSON = String + .format(testJsonTemplate, clusterName, zkConnection, PropertyType.INSTANCES.name(), + PropertyType.EXTERNALVIEW.name(), PropertyType.LIVEINSTANCES.name()); + + sourceConfigs.add(ViewClusterSourceConfig.fromJson(configJSON)); + } + return sourceConfigs; + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java index 3b498c3..216c900 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java @@ -132,4 +132,5 @@ public class TestRoutingTableSnapshot extends ZkTestBase { Assert.assertEquals(slaveInsEv.size(), 2); } } -} \ No newline at end of file +} + diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java index bccb425..36e26e7 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java @@ -81,23 +81,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { _gZkClient.createPersistent(path2, true); try { _gZkClient.writeData(path2, largeRecord); + Assert.fail("Should fail because data size is larger than 1M"); } catch (HelixException e) { - Assert.fail("Should not fail because data size is larger than 1M since compression applied"); + // OK } record = _gZkClient.readData(path2); - Assert.assertNotNull(record); + Assert.assertNull(record); // oversized write doesn't overwrite existing data on zk record = _gZkClient.readData(path1); try { _gZkClient.writeData(path1, largeRecord); + Assert.fail("Should fail because data size is larger than 1M"); } catch (HelixException e) { - Assert.fail("Should not fail because data size is larger than 1M since compression applied"); + // OK } ZNRecord recordNew = _gZkClient.readData(path1); byte[] arr = serializer.serialize(record); byte[] arrNew = serializer.serialize(recordNew); - Assert.assertFalse(Arrays.equals(arr, arrNew)); + Assert.assertTrue(Arrays.equals(arr, arrNew)); // test ZkDataAccessor ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient); @@ -119,7 +121,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { idealState.getRecord().setSimpleField(i + "", bufStr); } boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); - Assert.assertTrue(succeed); + Assert.assertFalse(succeed); HelixProperty property = accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1")); @@ -149,11 +151,11 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { } // System.out.println("record: " + idealState.getRecord()); succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState); - Assert.assertTrue(succeed); + Assert.assertFalse(succeed); recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord(); arr = serializer.serialize(record); arrNew = serializer.serialize(recordNew); - Assert.assertFalse(Arrays.equals(arr, arrNew)); + Assert.assertTrue(Arrays.equals(arr, arrNew)); System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at " + new Date(System.currentTimeMillis())); @@ -162,12 +164,12 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { @Test public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() { String className = getShortClassName(); - System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date( - System.currentTimeMillis())); + System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + + new Date(System.currentTimeMillis())); ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer(); - HelixZkClient zkClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR)); + HelixZkClient zkClient = + SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR)); try { zkClient.setZkSerializer(serializer); @@ -205,25 +207,25 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { zkClient.createPersistent(path2, true); try { zkClient.writeData(path2, largeRecord); + Assert.fail("Should fail because data size is larger than 1M"); } catch (HelixException e) { - Assert - .fail("Should not fail because data size is larger than 1M since compression applied"); + // OK } record = zkClient.readData(path2); - Assert.assertNotNull(record); + Assert.assertNull(record); // oversized write doesn't overwrite existing data on zk record = zkClient.readData(path1); try { zkClient.writeData(path1, largeRecord); + Assert.fail("Should fail because data size is larger than 1M"); } catch (HelixException e) { - Assert - .fail("Should not fail because data size is larger than 1M since compression applied"); + // OK } ZNRecord recordNew = zkClient.readData(path1); byte[] arr = serializer.serialize(record); byte[] arrNew = serializer.serialize(recordNew); - Assert.assertFalse(Arrays.equals(arr, arrNew)); + Assert.assertTrue(Arrays.equals(arr, arrNew)); // test ZkDataAccessor ZKHelixAdmin admin = new ZKHelixAdmin(zkClient); @@ -232,8 +234,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { admin.addInstance(className, instanceConfig); // oversized data should not create any new data on zk - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient)); Builder keyBuilder = accessor.keyBuilder(); // ZNRecord statusUpdates = new ZNRecord("statusUpdates"); @@ -246,9 +247,9 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { idealState.getRecord().setSimpleField(i + "", bufStr); } boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState); - Assert.assertTrue(succeed); + Assert.assertFalse(succeed); HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1")); - Assert.assertNotNull(property); + Assert.assertNull(property); // legal sized data gets written to zk idealState.getRecord().getSimpleFields().clear(); @@ -275,18 +276,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { } // System.out.println("record: " + idealState.getRecord()); succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState); - Assert.assertTrue(succeed); + Assert.assertFalse(succeed); recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord(); arr = serializer.serialize(record); arrNew = serializer.serialize(recordNew); - Assert.assertFalse(Arrays.equals(arr, arrNew)); - } catch (HelixException ex) { - Assert.fail("Should not fail because data size is larger than 1M since compression applied"); + Assert.assertTrue(Arrays.equals(arr, arrNew)); } finally { zkClient.close(); } - System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date( - System.currentTimeMillis())); + System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + + new Date(System.currentTimeMillis())); } } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java index 6156666..f84faf5 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java @@ -19,26 +19,21 @@ package org.apache.helix.monitoring; * under the License. */ -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Date; -import java.util.HashSet; -import java.util.Set; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.MBeanServerNotification; -import javax.management.MalformedObjectNameException; import javax.management.ObjectInstance; import javax.management.ObjectName; import javax.management.Query; import javax.management.QueryExp; +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; -import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; @@ -67,8 +62,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase { String className = TestHelper.getTestClassName(); _clusterNamePrefix = className; - System.out - .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); + System.out.println("START " + _clusterNamePrefix + " at " + + new Date(System.currentTimeMillis())); // setup 10 clusters for (int i = 0; i < clusterNb; i++) { @@ -97,7 +92,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase { clusterNb, // partitions per resource n, // number of nodes 3, // replicas - "LeaderStandby", true); // do rebalance + "LeaderStandby", + true); // do rebalance // start distributed cluster controllers _controllers = new ClusterDistributedController[n + n]; @@ -177,32 +173,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase { System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); } - class ParticipantMonitorListener extends ClusterMBeanObserver { - - int _nMbeansUnregistered = 0; - int _nMbeansRegistered = 0; - - public ParticipantMonitorListener(String domain) - throws InstanceNotFoundException, IOException, MalformedObjectNameException, - NullPointerException { - super(domain); - } - - @Override - public void onMBeanRegistered(MBeanServerConnection server, - MBeanServerNotification mbsNotification) { - LOG.info("Register mbean: " + mbsNotification.getMBeanName()); - _nMbeansRegistered++; - } - - @Override - public void onMBeanUnRegistered(MBeanServerConnection server, - MBeanServerNotification mbsNotification) { - LOG.info("Unregister mbean: " + mbsNotification.getMBeanName()); - _nMbeansUnregistered++; - } - } - private void cleanupControllers() { for (int i = 0; i < _controllers.length; i++) { if (_controllers[i] != null && _controllers[i].isConnected()) { @@ -326,7 +296,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase { cleanupControllers(); // Check if any MBeans leftover. // Note that MessageQueueStatus is not bound with controller only. So it will still exist. - final QueryExp exp2 = Query.and( Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))), exp1);
