This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 51904fb Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422) 51904fb is described below commit 51904fb99e4368c430503b2bf0eb5c42ce5663d1 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Wed Mar 30 22:24:51 2022 -0700 Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422) Add TableDeletionMessage, the tableDataManager can only be removed upon receiving the TableDeletionMessage when the table is dropped. --- .../common/messages/TableDeletionMessage.java | 46 ++++++++++++++++++ .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../helix/core/PinotHelixResourceManager.java | 54 ++++++++++++++++++++-- .../core/data/manager/InstanceDataManager.java | 6 +++ .../pinot/integration/tests/ClusterTest.java | 4 ++ .../tests/BaseClusterIntegrationTestSet.java | 22 +++++++++ .../tests/OfflineClusterIntegrationTest.java | 20 ++++++++ .../tests/RealtimeClusterIntegrationTest.java | 2 + .../server/starter/helix/BaseServerStarter.java | 6 +++ .../starter/helix/HelixInstanceDataManager.java | 39 +++++++++++++--- .../helix/SegmentMessageHandlerFactory.java | 25 ++++++++++ 11 files changed, 216 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java new file mode 100644 index 0000000..5b05ab0 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import com.google.common.base.Preconditions; +import java.util.UUID; +import javax.annotation.Nonnull; +import org.apache.helix.model.Message; + +/** + * This Helix message is sent from the controller to the servers to remove TableDataManager when the table is deleted. + */ +public class TableDeletionMessage extends Message { + public static final String DELETE_TABLE_MSG_SUB_TYPE = "DELETE_TABLE"; + + public TableDeletionMessage(@Nonnull String tableNameWithType) { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setResourceName(tableNameWithType); + setMsgSubType(DELETE_TABLE_MSG_SUB_TYPE); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + } + + public TableDeletionMessage(Message message) { + super(message.getRecord()); + String msgSubType = message.getMsgSubType(); + Preconditions.checkArgument(msgSubType.equals(DELETE_TABLE_MSG_SUB_TYPE), + "Invalid message sub type: " + msgSubType + " for TableDeletionMessage"); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 4a9b290..e6512df 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -33,6 +33,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { QUERY_EXECUTION_EXCEPTIONS("exceptions", false), HELIX_ZOOKEEPER_RECONNECTS("reconnects", true), DELETED_SEGMENT_COUNT("segments", false), + DELETE_TABLE_FAILURES("tables", false), REALTIME_ROWS_CONSUMED("rows", true), INVALID_REALTIME_ROWS_DROPPED("rows", false), REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ecbd3b7..3b90117 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -91,6 +91,7 @@ import org.apache.pinot.common.messages.RoutingTableRebuildMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; +import org.apache.pinot.common.messages.TableDeletionMessage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -1757,6 +1758,17 @@ public class PinotHelixResourceManager { HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, offlineTableName); LOGGER.info("Deleting table {}: Removed from broker resource", offlineTableName); + // Drop the table on servers + // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers + // instead of servers. This is because if externalview gets updated with significant delay, + // we may have the race condition for table recreation that the new table will use the old states + // (old table data manager) on the servers. + // Steps needed: + // 1. Drop the helix resource first (set idealstate as null) + // 2. Wait for the externalview to converge + // 3. Get servers for the tenant, and send delete table message to these servers + deleteTableOnServer(offlineTableName); + // Drop the table if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName)) { _helixAdmin.dropResource(_helixClusterName, offlineTableName); @@ -1800,6 +1812,11 @@ public class PinotHelixResourceManager { HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, realtimeTableName); LOGGER.info("Deleting table {}: Removed from broker resource", realtimeTableName); + // Drop the table on servers + // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers + // instead of servers. Follow the same steps for offline tables. + deleteTableOnServer(realtimeTableName); + // Cache the state and drop the table Set<String> instancesForTable = null; if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName)) { @@ -2053,6 +2070,36 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata); } + /** + * Delete the table on servers by sending table deletion message + */ + private void deleteTableOnServer(String tableNameWithType) { + LOGGER.info("Sending delete table message for table: {}", tableNameWithType); + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setResource(tableNameWithType); + recipientCriteria.setSessionSpecific(true); + TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + + // Externalview can be null for newly created table, skip sending the message + if (_helixZkManager.getHelixDataAccessor() + .getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)) == null) { + LOGGER.warn("No delete table message sent for newly created table: {} as the externalview is null.", + tableNameWithType); + return; + } + // Infinite timeout on the recipient + int timeoutMs = -1; + int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType); + } else { + LOGGER.warn("No delete table message sent for table: {}", tableNameWithType); + } + } + public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata, SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { @@ -3285,10 +3332,11 @@ public class PinotHelixResourceManager { tableNameWithType, segmentsToCheck)); } - private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { + public Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { ExternalView externalView = getTableExternalView(tableNameWithType); - Preconditions - .checkState(externalView != null, String.format("External view is null for table (%s)", tableNameWithType)); + if (externalView == null) { + return Collections.emptySet(); + } Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields(); Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 14d98b3..a879953 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -67,6 +67,12 @@ public interface InstanceDataManager { void shutDown(); /** + * Delete a table. + */ + void deleteTable(String tableNameWithType) + throws Exception; + + /** * Adds a segment from local disk into an OFFLINE table. */ void addOfflineSegment(String offlineTableName, String segmentName, File indexDir) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index f706477..ed28045 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -245,6 +245,10 @@ public abstract class ClusterTest extends ControllerTest { } } + protected List<HelixServerStarter> getServerStarters() { + return _serverStarters; + } + protected void startServerHttps() { FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR)); _serverStarters = new ArrayList<>(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 9d12aca..58019ce 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -30,6 +30,7 @@ import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; +import org.apache.pinot.server.starter.helix.HelixServerStarter; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; @@ -73,6 +74,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati } /** + * Test server table data manager deletion after the table is dropped + */ + protected void cleanupTestTableDataManager(String tableNameWithType) + throws Exception { + List<HelixServerStarter> serverStarters = getServerStarters(); + TestUtils.waitForCondition(aVoid -> { + try { + for (HelixServerStarter serverStarter : serverStarters) { + if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType) + != null) { + return false; + } + } + return true; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Failed to delete table data managers"); + } + + /** * Test hardcoded queries. * <p>NOTE: * <p>For queries with <code>LIMIT</code>, need to remove limit or add <code>LIMIT 10000</code> to the H2 SQL query diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 5936f68..eb2994c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -32,7 +32,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; @@ -71,6 +73,8 @@ import org.testng.annotations.Test; import static org.apache.pinot.common.function.scalar.StringFunctions.decodeUrl; import static org.apache.pinot.common.function.scalar.StringFunctions.encodeUrl; +import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS; +import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS; import static org.testng.Assert.*; @@ -409,7 +413,23 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(segmentsZKMetadata.size(), 1); assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), Long.MIN_VALUE); } + waitForNumOfSegmentsBecomeOnline(offlineTableName, 1); dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE); + cleanupTestTableDataManager(offlineTableName); + } + + private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int numSegments) + throws InterruptedException, TimeoutException { + long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS; + do { + Set<String> onlineSegments = _helixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType); + if (onlineSegments.size() == numSegments) { + return; + } + Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); + } while (System.currentTimeMillis() < endTimeMs); + throw new TimeoutException(String + .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType)); } @Test(dependsOnMethods = "testRangeIndexTriggering") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java index b3c3df4..6208c36 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java @@ -25,6 +25,7 @@ import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -167,6 +168,7 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe public void tearDown() throws Exception { dropRealtimeTable(getTableName()); + cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName())); stopServer(); stopBroker(); stopController(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 19467eb..d940ccb 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.starter.helix; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.ArrayList; @@ -718,6 +719,11 @@ public abstract class BaseServerStarter implements ServiceStartable { return _serverConf; } + @VisibleForTesting + public ServerInstance getServerInstance() { + return _serverInstance; + } + /** * Helper method to set system resource info into instance config. * diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 21d3c90..3cc1296 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; @@ -39,6 +40,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; +import org.apache.helix.model.ExternalView; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -73,6 +75,9 @@ import org.slf4j.LoggerFactory; @ThreadSafe public class HelixInstanceDataManager implements InstanceDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class); + // TODO: Make this configurable + private static final long EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 * 60_000L; // 20 minutes + private static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>(); @@ -185,17 +190,39 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override + public void deleteTable(String tableNameWithType) + throws Exception { + // Wait externalview to converge + long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS; + do { + ExternalView externalView = _helixManager.getHelixDataAccessor() + .getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)); + if (externalView == null) { + LOGGER.info("ExternalView converged for the table to delete: {}", tableNameWithType); + _tableDataManagerMap.compute(tableNameWithType, (k, v) -> { + if (v != null) { + v.shutDown(); + LOGGER.info("Removed table: {}", tableNameWithType); + } else { + LOGGER.warn("Failed to find table data manager for table: {}, skip removing the table", tableNameWithType); + } + return null; + }); + return; + } + Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); + } while (System.currentTimeMillis() < endTimeMs); + throw new TimeoutException( + "Timeout while waiting for ExternalView to converge for the table to delete: " + tableNameWithType); + } + + @Override public void removeSegment(String tableNameWithType, String segmentName) { LOGGER.info("Removing segment: {} from table: {}", segmentName, tableNameWithType); _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, v) -> { v.removeSegment(segmentName); LOGGER.info("Removed segment: {} from table: {}", segmentName, k); - if (v.getNumSegments() == 0) { - v.shutDown(); - return null; - } else { - return v; - } + return v; }); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index d00a558..ed6e9b6 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -26,6 +26,7 @@ import org.apache.helix.model.Message; import org.apache.pinot.common.Utils; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.SegmentReloadMessage; +import org.apache.pinot.common.messages.TableDeletionMessage; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.InstanceDataManager; @@ -58,6 +59,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context); case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE: return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context); + case TableDeletionMessage.DELETE_TABLE_MSG_SUB_TYPE: + return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context); default: LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType, message.getPartitionName()); @@ -144,6 +147,28 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { } } + private class TableDeletionMessageHandler extends DefaultMessageHandler { + TableDeletionMessageHandler(TableDeletionMessage tableDeletionMessage, ServerMetrics metrics, + NotificationContext context) { + super(tableDeletionMessage, metrics, context); + } + + @Override + public HelixTaskResult handleMessage() + throws InterruptedException { + HelixTaskResult helixTaskResult = new HelixTaskResult(); + _logger.info("Handling table deletion message"); + try { + _instanceDataManager.deleteTable(_tableNameWithType); + helixTaskResult.setSuccess(true); + } catch (Exception e) { + _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1); + Utils.rethrowException(e); + } + return helixTaskResult; + } + } + private static class DefaultMessageHandler extends MessageHandler { final String _segmentName; final String _tableNameWithType; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org