This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 717895be69 Group commit IdealState updates (#13976)
717895be69 is described below
commit 717895be6961aaa213f57b1438cd7faed2c0f16d
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Sep 14 22:58:00 2024 +0800
Group commit IdealState updates (#13976)
---
.../pinot/common/metrics/ControllerMeter.java | 3 +-
.../pinot/common/utils/helix/HelixHelper.java | 162 +----------
.../common/utils/helix/IdealStateGroupCommit.java | 308 +++++++++++++++++++++
.../pinot/controller/BaseControllerStarter.java | 3 +-
.../api/resources/PinotTableRestletResource.java | 9 +-
.../helix/core/PinotHelixResourceManager.java | 76 ++---
.../realtime/PinotLLCRealtimeSegmentManager.java | 113 ++++----
.../helix/IdealStateGroupCommitTest.java | 112 ++++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 5 -
9 files changed, 524 insertions(+), 267 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index b474a44a6d..ee034ec952 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -67,7 +67,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes",
false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false),
IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
- IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false);
+ IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
+ IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false);
private final String _brokerMeterName;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 4ffad84d61..43e6210e18 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -22,22 +22,17 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
@@ -47,12 +42,8 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ControllerMeter;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -69,156 +60,38 @@ public class HelixHelper {
private HelixHelper() {
}
- private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION =
1000;
- private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";
-
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final RetryPolicy
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
+
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixHelper.class);
private static final ZNRecordSerializer ZN_RECORD_SERIALIZER = new
ZNRecordSerializer();
+ private static final IdealStateGroupCommit IDEAL_STATE_GROUP_COMMIT = new
IdealStateGroupCommit();
private static final String ONLINE = "ONLINE";
private static final String OFFLINE = "OFFLINE";
public static final String BROKER_RESOURCE =
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
- private static int _minNumCharsInISToTurnOnCompression = -1;
-
- public static synchronized void setMinNumCharsInISToTurnOnCompression(int
minNumChars) {
- _minNumCharsInISToTurnOnCompression = minNumChars;
- }
-
public static IdealState cloneIdealState(IdealState idealState) {
return new IdealState(
(ZNRecord)
ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
}
- /**
- * Updates the ideal state, retrying if necessary in case of concurrent
updates to the ideal state.
- *
- * @param helixManager The HelixManager used to interact with the Helix
cluster
- * @param resourceName The resource for which to update the ideal state
- * @param updater A function that returns an updated ideal state given an
input ideal state
- * @return updated ideal state if successful, null if not
- */
public static IdealState updateIdealState(HelixManager helixManager, String
resourceName,
- Function<IdealState, IdealState> updater, RetryPolicy policy, boolean
noChangeOk) {
- // NOTE: ControllerMetrics could be null because this method might be
invoked by Broker.
- ControllerMetrics controllerMetrics = ControllerMetrics.get();
- try {
- long startTimeMs = System.currentTimeMillis();
- IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
- int retries = policy.attempt(new Callable<>() {
- @Override
- public Boolean call() {
- HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
- PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
- IdealState idealState = dataAccessor.getProperty(idealStateKey);
-
- // Make a copy of the idealState above to pass it to the updater
- // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
- // list fields
- IdealState idealStateCopy = cloneIdealState(idealState);
-
- IdealState updatedIdealState;
- try {
- updatedIdealState = updater.apply(idealStateCopy);
- } catch (PermanentUpdaterException e) {
- LOGGER.error("Caught permanent exception while updating ideal
state for resource: {}", resourceName, e);
- throw e;
- } catch (Exception e) {
- LOGGER.error("Caught exception while updating ideal state for
resource: {}", resourceName, e);
- return false;
- }
-
- // If there are changes to apply, apply them
- if (updatedIdealState != null &&
!idealState.equals(updatedIdealState)) {
- ZNRecord updatedZNRecord = updatedIdealState.getRecord();
-
- // Update number of partitions
- int numPartitions = updatedZNRecord.getMapFields().size();
- updatedIdealState.setNumPartitions(numPartitions);
-
- // If the ideal state is large enough, enable compression
- boolean enableCompression = shouldCompress(updatedIdealState);
- if (enableCompression) {
- updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
- } else {
-
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
- }
-
- // Check version and set ideal state
- try {
- if (dataAccessor.getBaseDataAccessor()
- .set(idealStateKey.getPath(), updatedZNRecord,
idealState.getRecord().getVersion(),
- AccessOption.PERSISTENT)) {
- idealStateWrapper._idealState = updatedIdealState;
- return true;
- } else {
- LOGGER.warn("Failed to update ideal state for resource: {}",
resourceName);
- return false;
- }
- } catch (ZkBadVersionException e) {
- LOGGER.warn("Version changed while updating ideal state for
resource: {}", resourceName);
- return false;
- } catch (Exception e) {
- LOGGER.warn("Caught exception while updating ideal state for
resource: {} (compressed={})", resourceName,
- enableCompression, e);
- return false;
- }
- } else {
- if (noChangeOk) {
- LOGGER.info("Idempotent or null ideal state update for resource
{}, skipping update.", resourceName);
- } else {
- LOGGER.warn("Idempotent or null ideal state update for resource
{}, skipping update.", resourceName);
- }
- idealStateWrapper._idealState = idealState;
- return true;
- }
- }
-
- private boolean shouldCompress(IdealState is) {
- if (is.getNumPartitions() >
NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
- return true;
- }
+ Function<IdealState, IdealState> updater) {
+ return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName, updater,
+ DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
+ }
- // Find the number of characters in one partition in idealstate, and
extrapolate
- // to estimate the number of characters.
- // We could serialize the znode to determine the exact size, but
that would mean serializing every
- // idealstate znode twice. We avoid some extra GC by estimating the
size instead. Such estimations
- // should be good for most installations that have similar segment
and instance names.
- Iterator<String> it = is.getPartitionSet().iterator();
- if (it.hasNext()) {
- String partitionName = it.next();
- int numChars = partitionName.length();
- Map<String, String> stateMap =
is.getInstanceStateMap(partitionName);
- for (Map.Entry<String, String> entry : stateMap.entrySet()) {
- numChars += entry.getKey().length();
- numChars += entry.getValue().length();
- }
- numChars *= is.getNumPartitions();
- return _minNumCharsInISToTurnOnCompression > 0 && numChars >
_minNumCharsInISToTurnOnCompression;
- }
- return false;
- }
- });
- if (controllerMetrics != null) {
- controllerMetrics.addMeteredValue(resourceName,
ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
- controllerMetrics.addTimedValue(resourceName,
ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
- System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
- }
- return idealStateWrapper._idealState;
- } catch (Exception e) {
- if (controllerMetrics != null) {
- controllerMetrics.addMeteredValue(resourceName,
ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
- }
- throw new RuntimeException("Caught exception while updating ideal state
for resource: " + resourceName, e);
- }
+ public static IdealState updateIdealState(HelixManager helixManager, String
resourceName,
+ Function<IdealState, IdealState> updater, RetryPolicy retryPolicy) {
+ return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName,
updater, retryPolicy, false);
}
- private static class IdealStateWrapper {
- IdealState _idealState;
+ public static IdealState updateIdealState(HelixManager helixManager, String
resourceName,
+ Function<IdealState, IdealState> updater, RetryPolicy retryPolicy,
boolean noChangeOk) {
+ return IDEAL_STATE_GROUP_COMMIT.commit(helixManager, resourceName,
updater, retryPolicy, noChangeOk);
}
/**
@@ -235,16 +108,6 @@ public class HelixHelper {
}
}
- public static IdealState updateIdealState(HelixManager helixManager, String
resourceName,
- Function<IdealState, IdealState> updater) {
- return updateIdealState(helixManager, resourceName, updater,
DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
- }
-
- public static IdealState updateIdealState(final HelixManager helixManager,
final String resourceName,
- final Function<IdealState, IdealState> updater, RetryPolicy policy) {
- return updateIdealState(helixManager, resourceName, updater, policy,
false);
- }
-
/**
* Updates broker resource ideal state for the given broker with the given
broker tags. Optional {@code tablesAdded}
* and {@code tablesRemoved} can be provided to track the tables
added/removed during the update.
@@ -554,7 +417,6 @@ public class HelixHelper {
return
instancesWithoutTag.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
}
-
public static List<InstanceConfig>
getInstancesConfigsWithTag(List<InstanceConfig> instanceConfigs, String tag) {
List<InstanceConfig> instancesWithTag = new ArrayList<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
new file mode 100644
index 0000000000..ea74fb18e2
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/IdealStateGroupCommit.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.helix;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * IdealStateGroupCommit is a utility class to commit group updates to
IdealState.
+ * It is designed to be used in a multi-threaded environment where multiple
threads
+ * may try to update the same IdealState concurrently.
+ * The implementation is shamelessly borrowed from (<a href=
+ *
"https://github.com/apache/helix/blob/helix-1.4.1/helix-core/src/main/java/org/apache/helix/GroupCommit.java"
+ * >HelixGroupCommit</a>).
+ * This is especially useful for updating large IdealState, which each update
may
+ * take a long time, e.g. to update one IdealState with 100k segments may take
+ * ~4 seconds, then 15 updates will take 1 minute and cause other requests
+ * (e.g. Segment upload, realtime segment commit, segment deletion, etc)
timeout.
+ */
+public class IdealStateGroupCommit {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IdealStateGroupCommit.class);
+
+ private static final int NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION =
1000;
+ private static final String ENABLE_COMPRESSIONS_KEY = "enableCompression";
+
+ private static int _minNumCharsInISToTurnOnCompression = -1;
+
+ private static class Queue {
+ final AtomicReference<Thread> _running = new AtomicReference<Thread>();
+ final ConcurrentLinkedQueue<Entry> _pending = new
ConcurrentLinkedQueue<Entry>();
+ }
+
+ private static class Entry {
+ final String _resourceName;
+ final Function<IdealState, IdealState> _updater;
+ IdealState _updatedIdealState = null;
+ AtomicBoolean _sent = new AtomicBoolean(false);
+
+ Entry(String resourceName, Function<IdealState, IdealState> updater) {
+ _resourceName = resourceName;
+ _updater = updater;
+ }
+ }
+
+ private final Queue[] _queues = new Queue[100];
+
+ /**
+ * Set up a group committer and its associated queues
+ */
+ public IdealStateGroupCommit() {
+ // Don't use Arrays.fill();
+ for (int i = 0; i < _queues.length; i++) {
+ _queues[i] = new Queue();
+ }
+ }
+
+ private Queue getQueue(String resourceName) {
+ return _queues[(resourceName.hashCode() & Integer.MAX_VALUE) %
_queues.length];
+ }
+
+ public static synchronized void setMinNumCharsInISToTurnOnCompression(int
minNumChars) {
+ _minNumCharsInISToTurnOnCompression = minNumChars;
+ }
+
+ /**
+ * Do a group update for idealState associated with a given resource key
+ * @param helixManager helixManager with the ability to pull from the
current data\
+ * @param resourceName the resource name to be updated
+ * @param updater the idealState updater to be applied
+ * @return IdealState if the update is successful, null if not
+ */
+ public IdealState commit(HelixManager helixManager, String resourceName,
+ Function<IdealState, IdealState> updater, RetryPolicy retryPolicy,
boolean noChangeOk) {
+ Queue queue = getQueue(resourceName);
+ Entry entry = new Entry(resourceName, updater);
+
+ queue._pending.add(entry);
+ while (!entry._sent.get()) {
+ if (queue._running.compareAndSet(null, Thread.currentThread())) {
+ ArrayList<Entry> processed = new ArrayList<>();
+ try {
+ if (queue._pending.peek() == null) {
+ // All pending entries have been processed, the updatedIdealState
should be set.
+ return entry._updatedIdealState;
+ }
+ // remove from queue
+ Entry first = queue._pending.poll();
+ processed.add(first);
+ String mergedResourceName = first._resourceName;
+ HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+ PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
+ IdealState idealState = dataAccessor.getProperty(idealStateKey);
+
+ // Make a copy of the idealState above to pass it to the updater
+ // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
+ // list fields
+ IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
+
+ /**
+ * If the local cache does not contain a value, need to check if
there is a
+ * value in ZK; use it as initial value if exists
+ */
+ IdealState updatedIdealState = first._updater.apply(idealStateCopy);
+ first._updatedIdealState = updatedIdealState;
+ Iterator<Entry> it = queue._pending.iterator();
+ while (it.hasNext()) {
+ Entry ent = it.next();
+ if (!ent._resourceName.equals(mergedResourceName)) {
+ continue;
+ }
+ processed.add(ent);
+ updatedIdealState = ent._updater.apply(idealStateCopy);
+ ent._updatedIdealState = updatedIdealState;
+ it.remove();
+ }
+ IdealState finalUpdatedIdealState = updatedIdealState;
+ updateIdealState(helixManager, resourceName, anyIdealState ->
finalUpdatedIdealState,
+ retryPolicy, noChangeOk);
+ } finally {
+ queue._running.set(null);
+ for (Entry e : processed) {
+ synchronized (e) {
+ e._sent.set(true);
+ e.notify();
+ }
+ }
+ }
+ } else {
+ synchronized (entry) {
+ try {
+ entry.wait(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while committing change, resourceName: "
+ resourceName + ", updater: " + updater,
+ e);
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+ }
+ }
+ return entry._updatedIdealState;
+ }
+
+ private static class IdealStateWrapper {
+ IdealState _idealState;
+ }
+
+ /**
+ * Updates the ideal state, retrying if necessary in case of concurrent
updates to the ideal state.
+ *
+ * @param helixManager The HelixManager used to interact with the Helix
cluster
+ * @param resourceName The resource for which to update the ideal state
+ * @param updater A function that returns an updated ideal state given an
input ideal state
+ * @return updated ideal state if successful, null if not
+ */
+ private static IdealState updateIdealState(HelixManager helixManager, String
resourceName,
+ Function<IdealState, IdealState> updater, RetryPolicy policy, boolean
noChangeOk) {
+ // NOTE: ControllerMetrics could be null because this method might be
invoked by Broker.
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
+ try {
+ long startTimeMs = System.currentTimeMillis();
+ IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
+ int retries = policy.attempt(new Callable<>() {
+ @Override
+ public Boolean call() {
+ HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+ PropertyKey idealStateKey =
dataAccessor.keyBuilder().idealStates(resourceName);
+ IdealState idealState = dataAccessor.getProperty(idealStateKey);
+
+ // Make a copy of the idealState above to pass it to the updater
+ // NOTE: new IdealState(idealState.getRecord()) does not work
because it's shallow copy for map fields and
+ // list fields
+ IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);
+ IdealState updatedIdealState;
+ try {
+ updatedIdealState = updater.apply(idealStateCopy);
+ } catch (HelixHelper.PermanentUpdaterException e) {
+ LOGGER.error("Caught permanent exception while updating ideal
state for resource: {}", resourceName, e);
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while updating ideal state for
resource: {}", resourceName, e);
+ return false;
+ }
+
+ // If there are changes to apply, apply them
+ if (updatedIdealState != null &&
!idealState.equals(updatedIdealState)) {
+ ZNRecord updatedZNRecord = updatedIdealState.getRecord();
+
+ // Update number of partitions
+ int numPartitions = updatedZNRecord.getMapFields().size();
+ updatedIdealState.setNumPartitions(numPartitions);
+
+ // If the ideal state is large enough, enable compression
+ boolean enableCompression = shouldCompress(updatedIdealState);
+ if (enableCompression) {
+ updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
+ } else {
+
updatedZNRecord.getSimpleFields().remove(ENABLE_COMPRESSIONS_KEY);
+ }
+
+ // Check version and set ideal state
+ try {
+ if (dataAccessor.getBaseDataAccessor()
+ .set(idealStateKey.getPath(), updatedZNRecord,
idealState.getRecord().getVersion(),
+ AccessOption.PERSISTENT)) {
+ idealStateWrapper._idealState = updatedIdealState;
+ return true;
+ } else {
+ LOGGER.warn("Failed to update ideal state for resource: {}",
resourceName);
+ return false;
+ }
+ } catch (ZkBadVersionException e) {
+ LOGGER.warn("Version changed while updating ideal state for
resource: {}", resourceName);
+ return false;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while updating ideal state for
resource: {} (compressed={})", resourceName,
+ enableCompression, e);
+ return false;
+ }
+ } else {
+ if (noChangeOk) {
+ LOGGER.info("Idempotent or null ideal state update for resource
{}, skipping update.", resourceName);
+ } else {
+ LOGGER.warn("Idempotent or null ideal state update for resource
{}, skipping update.", resourceName);
+ }
+ idealStateWrapper._idealState = idealState;
+ return true;
+ }
+ }
+
+ private boolean shouldCompress(IdealState is) {
+ if (is.getNumPartitions() >
NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
+ return true;
+ }
+
+ // Find the number of characters in one partition in idealstate, and
extrapolate
+ // to estimate the number of characters.
+ // We could serialize the znode to determine the exact size, but
that would mean serializing every
+ // idealstate znode twice. We avoid some extra GC by estimating the
size instead. Such estimations
+ // should be good for most installations that have similar segment
and instance names.
+ Iterator<String> it = is.getPartitionSet().iterator();
+ if (it.hasNext()) {
+ String partitionName = it.next();
+ int numChars = partitionName.length();
+ Map<String, String> stateMap =
is.getInstanceStateMap(partitionName);
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ numChars += entry.getKey().length();
+ numChars += entry.getValue().length();
+ }
+ numChars *= is.getNumPartitions();
+ return _minNumCharsInISToTurnOnCompression > 0 && numChars >
_minNumCharsInISToTurnOnCompression;
+ }
+ return false;
+ }
+ });
+ if (controllerMetrics != null) {
+ controllerMetrics.addMeteredValue(resourceName,
ControllerMeter.IDEAL_STATE_UPDATE_RETRY, retries);
+ controllerMetrics.addTimedValue(resourceName,
ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS,
+ System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+ controllerMetrics.addMeteredValue(resourceName,
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
+ }
+ return idealStateWrapper._idealState;
+ } catch (Exception e) {
+ if (controllerMetrics != null) {
+ controllerMetrics.addMeteredValue(resourceName,
ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
+ }
+ throw new RuntimeException("Caught exception while updating ideal state
for resource: " + resourceName, e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index eae4276b84..5e4ff8751f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -76,6 +76,7 @@ import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.common.utils.log.DummyLogFileServer;
import org.apache.pinot.common.utils.log.LocalLogFileServer;
@@ -213,7 +214,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
setupHelixSystemProperties();
-
HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
+
IdealStateGroupCommit.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
_listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config);
_controllerMode = _config.getControllerMode();
inferHostnameIfNeeded(_config);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index bc7d16bde6..dee89efd64 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -129,15 +129,18 @@ import static
org.apache.pinot.spi.utils.CommonConstants.DATABASE;
import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
-@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY),
- @Authorization(value = DATABASE)})
+@Api(tags = Constants.TABLE_TAG, authorizations = {
+ @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+ @Authorization(value = DATABASE)
+})
@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
@ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
@ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
description = "Database context passed through http header. If no
context is provided 'default' database "
- + "context will be considered.")}))
+ + "context will be considered.")
+}))
@Path("/")
public class PinotTableRestletResource {
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c7eeb341e1..2b59109b89 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -212,7 +212,6 @@ public class PinotHelixResourceManager {
private final Map<String, Map<String, Long>> _segmentCrcMap = new
HashMap<>();
private final Map<String, Map<String, Integer>>
_lastKnownSegmentMetadataVersionMap = new HashMap<>();
- private final Object[] _idealStateUpdaterLocks;
private final Object[] _lineageUpdaterLocks;
private final LoadingCache<String, String> _instanceAdminEndpointCache;
@@ -256,10 +255,6 @@ public class PinotHelixResourceManager {
return InstanceUtils.getServerAdminEndpoint(instanceConfig);
}
});
- _idealStateUpdaterLocks = new
Object[DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE];
- for (int i = 0; i < _idealStateUpdaterLocks.length; i++) {
- _idealStateUpdaterLocks[i] = new Object();
- }
_lineageUpdaterLocks = new Object[DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE];
for (int i = 0; i < _lineageUpdaterLocks.length; i++) {
_lineageUpdaterLocks[i] = new Object();
@@ -1018,16 +1013,13 @@ public class PinotHelixResourceManager {
LOGGER.info("Trying to delete segments: {} from table: {} ",
segmentNames, tableNameWithType);
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix",
tableNameWithType);
-
- synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
- HelixHelper.removeSegmentsFromIdealState(_helixZkManager,
tableNameWithType, segmentNames);
- if (retentionPeriod != null) {
- _segmentDeletionManager.deleteSegments(tableNameWithType,
segmentNames,
- TimeUtils.convertPeriodToMillis(retentionPeriod));
- } else {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- _segmentDeletionManager.deleteSegments(tableNameWithType,
segmentNames, tableConfig);
- }
+ HelixHelper.removeSegmentsFromIdealState(_helixZkManager,
tableNameWithType, segmentNames);
+ if (retentionPeriod != null) {
+ _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames,
+ TimeUtils.convertPeriodToMillis(retentionPeriod));
+ } else {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ _segmentDeletionManager.deleteSegments(tableNameWithType,
segmentNames, tableConfig);
}
return PinotResourceManagerResponse.success("Segment " + segmentNames +
" deleted");
} catch (final Exception e) {
@@ -1980,13 +1972,11 @@ public class PinotHelixResourceManager {
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
String replicationConfigured =
Integer.toString(tableConfig.getReplication());
if (!idealState.getReplicas().equals(replicationConfigured)) {
- synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
- HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is ->
{
- assert is != null;
- is.setReplicas(replicationConfigured);
- return is;
- }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
- }
+ HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> {
+ assert is != null;
+ is.setReplicas(replicationConfigured);
+ return is;
+ }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
}
// Assign instances
@@ -2328,26 +2318,24 @@ public class PinotHelixResourceManager {
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, _controllerMetrics);
- synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
- Map<InstancePartitionsType, InstancePartitions>
finalInstancePartitionsMap = instancePartitionsMap;
- HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
- assert idealState != null;
- Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
- if (currentAssignment.containsKey(segmentName)) {
- LOGGER.warn("Segment: {} already exists in the IdealState for
table: {}, do not update", segmentName,
- tableNameWithType);
- } else {
- List<String> assignedInstances =
- segmentAssignment.assignSegment(segmentName,
currentAssignment, finalInstancePartitionsMap);
- LOGGER.info("Assigning segment: {} to instances: {} for table:
{}", segmentName, assignedInstances,
- tableNameWithType);
- currentAssignment.put(segmentName,
- SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
- }
- return idealState;
- });
- LOGGER.info("Added segment: {} to IdealState for table: {}",
segmentName, tableNameWithType);
- }
+ Map<InstancePartitionsType, InstancePartitions>
finalInstancePartitionsMap = instancePartitionsMap;
+ HelixHelper.updateIdealState(_helixZkManager, tableNameWithType,
idealState -> {
+ assert idealState != null;
+ Map<String, Map<String, String>> currentAssignment =
idealState.getRecord().getMapFields();
+ if (currentAssignment.containsKey(segmentName)) {
+ LOGGER.warn("Segment: {} already exists in the IdealState for table:
{}, do not update", segmentName,
+ tableNameWithType);
+ } else {
+ List<String> assignedInstances =
+ segmentAssignment.assignSegment(segmentName, currentAssignment,
finalInstancePartitionsMap);
+ LOGGER.info("Assigning segment: {} to instances: {} for table: {}",
segmentName, assignedInstances,
+ tableNameWithType);
+ currentAssignment.put(segmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(assignedInstances,
SegmentStateModel.ONLINE));
+ }
+ return idealState;
+ });
+ LOGGER.info("Added segment: {} to IdealState for table: {}",
segmentName, tableNameWithType);
} catch (Exception e) {
LOGGER.error(
"Caught exception while adding segment: {} to IdealState for table:
{}, deleting segment ZK metadata",
@@ -2402,10 +2390,6 @@ public class PinotHelixResourceManager {
return ((upsertConfig != null) && upsertConfig.getMode() !=
UpsertConfig.Mode.NONE);
}
- public Object getIdealStateUpdaterLock(String tableNameWithType) {
- return _idealStateUpdaterLocks[(tableNameWithType.hashCode() &
Integer.MAX_VALUE) % _idealStateUpdaterLocks.length];
- }
-
public Object getLineageUpdaterLock(String tableNameWithType) {
return _lineageUpdaterLocks[(tableNameWithType.hashCode() &
Integer.MAX_VALUE) % _lineageUpdaterLocks.length];
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 910291ff00..76bef151a0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -585,11 +585,9 @@ public class PinotLLCRealtimeSegmentManager {
// the idealstate update fails due to contention. We serialize the updates
to the idealstate
// to reduce this contention. We may still contend with RetentionManager,
or other updates
// to idealstate from other controllers, but then we have the retry
mechanism to get around that.
- synchronized
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
- idealState =
- updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
- segmentAssignment, instancePartitionsMap);
- }
+ idealState =
+ updateIdealStateOnSegmentCompletion(realtimeTableName,
committingSegmentName, newConsumingSegmentName,
+ segmentAssignment, instancePartitionsMap);
long endTimeNs = System.nanoTime();
LOGGER.info(
@@ -816,24 +814,22 @@ public class PinotLLCRealtimeSegmentManager {
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
String segmentName = llcSegmentName.getSegmentName();
LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}",
segmentName, instanceName);
- synchronized
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
- try {
- HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
- assert idealState != null;
- Map<String, String> stateMap =
idealState.getInstanceStateMap(segmentName);
- String state = stateMap.get(instanceName);
- if (SegmentStateModel.CONSUMING.equals(state)) {
- stateMap.put(instanceName, SegmentStateModel.OFFLINE);
- } else {
- LOGGER.info("Segment {} in state {} when trying to register
consumption stop from {}", segmentName, state,
- instanceName);
- }
- return idealState;
- }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
- } catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
- throw e;
- }
+ try {
+ HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
+ assert idealState != null;
+ Map<String, String> stateMap =
idealState.getInstanceStateMap(segmentName);
+ String state = stateMap.get(instanceName);
+ if (SegmentStateModel.CONSUMING.equals(state)) {
+ stateMap.put(instanceName, SegmentStateModel.OFFLINE);
+ } else {
+ LOGGER.info("Segment {} in state {} when trying to register
consumption stop from {}", segmentName, state,
+ instanceName);
+ }
+ return idealState;
+ }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
+ } catch (Exception e) {
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
+ throw e;
}
// We know that we have successfully set the idealstate to be OFFLINE.
// We can now do a best effort to reset the externalview to be OFFLINE if
it is in ERROR state.
@@ -925,33 +921,31 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
- synchronized
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
- HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
- assert idealState != null;
- boolean isTableEnabled = idealState.isEnabled();
- boolean isTablePaused = isTablePaused(idealState);
- boolean offsetsHaveToChange = offsetCriteria != null;
- if (isTableEnabled && !isTablePaused) {
- List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
- offsetsHaveToChange
- ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
- : getPartitionGroupConsumptionStatusList(idealState,
streamConfig);
- OffsetCriteria originalOffsetCriteria =
streamConfig.getOffsetCriteria();
- // Read the smallest offset when a new partition is detected
- streamConfig.setOffsetCriteria(
- offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
- streamConfig.setOffsetCriteria(originalOffsetCriteria);
- return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupMetadataList,
- recreateDeletedConsumingSegment, offsetCriteria);
- } else {
- LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
- realtimeTableName, isTableEnabled, isTablePaused);
- return idealState;
- }
- }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
- }
+ HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState
-> {
+ assert idealState != null;
+ boolean isTableEnabled = idealState.isEnabled();
+ boolean isTablePaused = isTablePaused(idealState);
+ boolean offsetsHaveToChange = offsetCriteria != null;
+ if (isTableEnabled && !isTablePaused) {
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ offsetsHaveToChange
+ ? Collections.emptyList() // offsets from metadata are not
valid anymore; fetch for all partitions
+ : getPartitionGroupConsumptionStatusList(idealState,
streamConfig);
+ OffsetCriteria originalOffsetCriteria =
streamConfig.getOffsetCriteria();
+ // Read the smallest offset when a new partition is detected
+ streamConfig.setOffsetCriteria(
+ offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
+ return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupMetadataList,
+ recreateDeletedConsumingSegment, offsetCriteria);
+ } else {
+ LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
+ realtimeTableName, isTableEnabled, isTablePaused);
+ return idealState;
+ }
+ }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
}
/**
@@ -961,7 +955,7 @@ public class PinotLLCRealtimeSegmentManager {
IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName,
String committingSegmentName,
String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- return HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
+ return HelixHelper.updateIdealState(_helixManager, realtimeTableName,
idealState -> {
assert idealState != null;
// When segment completion begins, the zk metadata is updated, followed
by ideal state.
// We allow only {@link
PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a
segment to
@@ -1760,16 +1754,13 @@ public class PinotLLCRealtimeSegmentManager {
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
- IdealState updatedIdealState;
- synchronized
(_helixResourceManager.getIdealStateUpdaterLock(tableNameWithType)) {
- updatedIdealState = HelixHelper.updateIdealState(_helixManager,
tableNameWithType, idealState -> {
- ZNRecord znRecord = idealState.getRecord();
- znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
- // maintain for backward compatibility
- znRecord.setSimpleField(IS_TABLE_PAUSED,
Boolean.valueOf(pause).toString());
- return new IdealState(znRecord);
- }, RetryPolicies.noDelayRetryPolicy(3));
- }
+ IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager,
tableNameWithType, idealState -> {
+ ZNRecord znRecord = idealState.getRecord();
+ znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
+ // maintain for backward compatibility
+ znRecord.setSimpleField(IS_TABLE_PAUSED,
Boolean.valueOf(pause).toString());
+ return new IdealState(znRecord);
+ }, RetryPolicies.noDelayRetryPolicy(3));
LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. "
+ "Also set 'isTablePaused' to {} for backward compatibility.",
pauseState, tableNameWithType, pause);
return updatedIdealState;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
new file mode 100644
index 0000000000..ffe39764a4
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/IdealStateGroupCommitTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.helix.IdealStateGroupCommit;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class IdealStateGroupCommitTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IdealStateGroupCommit.class);
+ private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
+ private static final String TABLE_NAME = "potato_OFFLINE";
+ private static final int NUM_UPDATES = 2400;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TEST_INSTANCE.setupSharedStateAndValidate();
+
+ IdealState idealState = new IdealState(TABLE_NAME);
+ idealState.setStateModelDefRef("OnlineOffline");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ idealState.setReplicas("1");
+ idealState.setNumPartitions(0);
+ TEST_INSTANCE.getHelixAdmin()
+ .addResource(TEST_INSTANCE.getHelixClusterName(), TABLE_NAME,
idealState);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ TEST_INSTANCE.cleanup();
+ }
+
+ @Test
+ public void testGroupCommit()
+ throws InterruptedException {
+ final IdealStateGroupCommit commit = new IdealStateGroupCommit();
+ ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(400);
+ for (int i = 0; i < NUM_UPDATES; i++) {
+ Runnable runnable = new
IdealStateUpdater(TEST_INSTANCE.getHelixManager(), commit, TABLE_NAME, i);
+ newFixedThreadPool.submit(runnable);
+ }
+ IdealState idealState =
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
+ while (idealState.getNumPartitions() < NUM_UPDATES) {
+ Thread.sleep(500);
+ idealState =
HelixHelper.getTableIdealState(TEST_INSTANCE.getHelixManager(), TABLE_NAME);
+ }
+ Assert.assertEquals(idealState.getNumPartitions(), NUM_UPDATES);
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
+ long idealStateUpdateSuccessCount =
+ controllerMetrics.getMeteredTableValue(TABLE_NAME,
ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS).count();
+ Assert.assertTrue(idealStateUpdateSuccessCount < NUM_UPDATES);
+ LOGGER.info("{} IdealState update are successfully commited with {} times
zk updates.", NUM_UPDATES,
+ idealStateUpdateSuccessCount);
+ }
+}
+
+class IdealStateUpdater implements Runnable {
+ private final HelixManager _helixManager;
+ private final IdealStateGroupCommit _commit;
+ private final String _tableName;
+ private final int _i;
+
+ public IdealStateUpdater(HelixManager helixManager, IdealStateGroupCommit
commit, String tableName, int i) {
+ _helixManager = helixManager;
+ _commit = commit;
+ _tableName = tableName;
+ _i = i;
+ }
+
+ @Override
+ public void run() {
+ _commit.commit(_helixManager, _tableName, new Function<IdealState,
IdealState>() {
+ @Override
+ public IdealState apply(IdealState idealState) {
+ idealState.setPartitionState("test_id" + _i, "test_id" + _i, "ONLINE");
+ return idealState;
+ }
+ }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f), false);
+ HelixHelper.getTableIdealState(_helixManager, _tableName);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 16d1983a21..b1f8520aad 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -90,7 +90,6 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
import static
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -202,7 +201,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testCommitSegment() {
// Set up a new table with 2 replicas, 5 instances, 4 partition
PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
-
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
Object());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);
@@ -325,7 +323,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testSetUpNewPartitions() {
// Set up a new table with 2 replicas, 5 instances, 0 partition
PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
-
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
Object());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 0);
@@ -499,7 +496,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testRepairs() {
// Set up a new table with 2 replicas, 5 instances, 4 partitions
PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
-
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
Object());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);
@@ -894,7 +890,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testCommitSegmentMetadata() {
// Set up a new table with 2 replicas, 5 instances, 4 partition
PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
-
when(mockHelixResourceManager.getIdealStateUpdaterLock(anyString())).thenReturn(new
Object());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
setUpNewTable(segmentManager, 2, 5, 4);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]