This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new decc4eba2f Properly handle shutdown of TableDataManager (#11380)
decc4eba2f is described below
commit decc4eba2f5f1f4e13ee5f97a62d51518063a7de
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 18 18:39:54 2023 -0700
Properly handle shutdown of TableDataManager (#11380)
---
.../helix/core/PinotHelixResourceManager.java | 170 ++++++---------------
.../core/data/manager/BaseTableDataManager.java | 76 ++++++---
.../core/data/manager/InstanceDataManager.java | 6 -
.../manager/offline/DimensionTableDataManager.java | 10 +-
.../manager/offline/OfflineTableDataManager.java | 1 +
.../manager/realtime/RealtimeTableDataManager.java | 29 ++--
.../tests/BaseClusterIntegrationTest.java | 31 ++--
.../tests/BaseClusterIntegrationTestSet.java | 20 ---
.../tests/BaseRealtimeClusterIntegrationTest.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 8 +-
.../local/data/manager/TableDataManager.java | 4 +-
.../starter/helix/HelixInstanceDataManager.java | 82 +++++-----
.../SegmentOnlineOfflineStateModelFactory.java | 125 +++++++++------
13 files changed, 266 insertions(+), 298 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d584136740..1974d952b5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1984,62 +1984,7 @@ public class PinotHelixResourceManager {
}
public void deleteOfflineTable(String tableName, @Nullable String
retentionPeriod) {
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
- LOGGER.info("Deleting table {}: Start", offlineTableName);
-
- // Remove the table from brokerResource
- HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed from broker resource",
offlineTableName);
-
- // Drop the table on servers
- // TODO: Make this api idempotent and blocking by waiting for externalview
to converge on controllers
- // instead of servers. This is because if externalview gets updated
with significant delay,
- // we may have the race condition for table recreation that the new
table will use the old states
- // (old table data manager) on the servers.
- // Steps needed:
- // 1. Drop the helix resource first (set idealstate as null)
- // 2. Wait for the externalview to converge
- // 3. Get servers for the tenant, and send delete table message to
these servers
- deleteTableOnServer(offlineTableName);
-
- // Drop the table
- if
(_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName))
{
- _helixAdmin.dropResource(_helixClusterName, offlineTableName);
- LOGGER.info("Deleting table {}: Removed helix table resource",
offlineTableName);
- }
-
- // Remove all stored segments for the table
- Long retentionPeriodMs = retentionPeriod != null ?
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(offlineTableName,
getSegmentsFromPropertyStore(offlineTableName),
- retentionPeriodMs);
- LOGGER.info("Deleting table {}: Removed stored segments",
offlineTableName);
-
- // Remove segment metadata
- ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed segment metadata",
offlineTableName);
-
- // Remove instance partitions
- InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed instance partitions",
offlineTableName);
-
- // Remove tier instance partitions
- InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed tier instance partitions",
offlineTableName);
-
- // Remove segment lineage
- SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed segment lineage",
offlineTableName);
-
- // Remove task related metadata
- MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed all minion task metadata",
offlineTableName);
-
- // Remove table config
- // this should always be the last step for deletion to avoid race
condition in table re-create.
- ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore,
offlineTableName);
- LOGGER.info("Deleting table {}: Removed table config", offlineTableName);
-
- LOGGER.info("Deleting table {}: Finish", offlineTableName);
+ deleteTable(tableName, TableType.OFFLINE, retentionPeriod);
}
public void deleteRealtimeTable(String tableName) {
@@ -2047,73 +1992,64 @@ public class PinotHelixResourceManager {
}
public void deleteRealtimeTable(String tableName, @Nullable String
retentionPeriod) {
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
- LOGGER.info("Deleting table {}: Start", realtimeTableName);
+ deleteTable(tableName, TableType.REALTIME, retentionPeriod);
+ }
+
+ public void deleteTable(String tableName, TableType tableType, @Nullable
String retentionPeriod) {
+ String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+ LOGGER.info("Deleting table {}: Start", tableNameWithType);
// Remove the table from brokerResource
- HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager,
realtimeTableName);
- LOGGER.info("Deleting table {}: Removed from broker resource",
realtimeTableName);
+ HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed from broker resource",
tableNameWithType);
- // Drop the table on servers
- // TODO: Make this api idempotent and blocking by waiting for externalview
to converge on controllers
- // instead of servers. Follow the same steps for offline tables.
- deleteTableOnServer(realtimeTableName);
+ // Delete the table on servers
+ deleteTableOnServers(tableNameWithType);
- // Cache the state and drop the table
- Set<String> instancesForTable = null;
- if
(_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName))
{
- instancesForTable = getAllInstancesForTable(realtimeTableName);
- _helixAdmin.dropResource(_helixClusterName, realtimeTableName);
- LOGGER.info("Deleting table {}: Removed helix table resource",
realtimeTableName);
- }
+ // Remove ideal state
+
_helixDataAccessor.removeProperty(_keyBuilder.idealStates(tableNameWithType));
+ LOGGER.info("Deleting table {}: Removed ideal state", tableNameWithType);
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ?
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
- _segmentDeletionManager.removeSegmentsFromStore(realtimeTableName,
getSegmentsFromPropertyStore(realtimeTableName),
+ _segmentDeletionManager.removeSegmentsFromStore(tableNameWithType,
getSegmentsFromPropertyStore(tableNameWithType),
retentionPeriodMs);
- LOGGER.info("Deleting table {}: Removed stored segments",
realtimeTableName);
+ LOGGER.info("Deleting table {}: Removed stored segments",
tableNameWithType);
// Remove segment metadata
- ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore,
realtimeTableName);
- LOGGER.info("Deleting table {}: Removed segment metadata",
realtimeTableName);
+ ZKMetadataProvider.removeResourceSegmentsFromPropertyStore(_propertyStore,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed segment metadata",
tableNameWithType);
// Remove instance partitions
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
-
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
- InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
-
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
- LOGGER.info("Deleting table {}: Removed instance partitions",
realtimeTableName);
+ if (tableType == TableType.OFFLINE) {
+ InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
tableNameWithType);
+ } else {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
+ InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
+
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
+ }
+ LOGGER.info("Deleting table {}: Removed instance partitions",
tableNameWithType);
- InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore,
rawTableName);
- LOGGER.info("Deleting table {}: Removed tier instance partitions",
realtimeTableName);
+ // Remove tier instance partitions
+ InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed tier instance partitions",
tableNameWithType);
// Remove segment lineage
- SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore,
realtimeTableName);
- LOGGER.info("Deleting table {}: Removed segment lineage",
realtimeTableName);
+ SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed segment lineage",
tableNameWithType);
// Remove task related metadata
- MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore,
realtimeTableName);
- LOGGER.info("Deleting table {}: Removed all minion task metadata",
realtimeTableName);
-
- // Remove groupId/partitionId mapping for HLC table
- if (instancesForTable != null) {
- for (String instance : instancesForTable) {
- InstanceZKMetadata instanceZKMetadata =
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, instance);
- if (instanceZKMetadata != null) {
- instanceZKMetadata.removeResource(realtimeTableName);
- ZKMetadataProvider.setInstanceZKMetadata(_propertyStore,
instanceZKMetadata);
- }
- }
- }
- LOGGER.info("Deleting table {}: Removed groupId/partitionId mapping for
HLC table", realtimeTableName);
+ MinionTaskMetadataUtils.deleteTaskMetadata(_propertyStore,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed all minion task metadata",
tableNameWithType);
// Remove table config
- // this should always be the last step for deletion to avoid race
condition in table re-create.
- ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore,
realtimeTableName);
- LOGGER.info("Deleting table {}: Removed table config", realtimeTableName);
+ // NOTE: This should always be the last step for deletion to avoid race
condition in table re-create
+ ZKMetadataProvider.removeResourceConfigFromPropertyStore(_propertyStore,
tableNameWithType);
+ LOGGER.info("Deleting table {}: Removed table config", tableNameWithType);
- LOGGER.info("Deleting table {}: Finish", realtimeTableName);
+ LOGGER.info("Deleting table {}: Finish", tableNameWithType);
}
/**
@@ -2152,15 +2088,6 @@ public class PinotHelixResourceManager {
}
}
- private Set<String> getAllInstancesForTable(String tableNameWithType) {
- Set<String> instanceSet = new HashSet<>();
- IdealState tableIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
- for (String partition : tableIdealState.getPartitionSet()) {
- instanceSet.addAll(tableIdealState.getInstanceSet(partition));
- }
- return instanceSet;
- }
-
/**
* Returns the ZK metdata for the given jobId and jobType
* @param jobId the id of the job
@@ -2443,10 +2370,16 @@ public class PinotHelixResourceManager {
}
/**
- * Delete the table on servers by sending table deletion message
+ * Delete the table on servers by sending table deletion messages.
*/
- private void deleteTableOnServer(String tableNameWithType) {
- LOGGER.info("Sending delete table message for table: {}",
tableNameWithType);
+ private void deleteTableOnServers(String tableNameWithType) {
+ // External view can be null for newly created table, skip sending messages
+ if
(_helixDataAccessor.getProperty(_keyBuilder.externalView(tableNameWithType)) ==
null) {
+ LOGGER.warn("No delete table message sent for newly created table: {}
without external view", tableNameWithType);
+ return;
+ }
+
+ LOGGER.info("Sending delete table messages for table: {}",
tableNameWithType);
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
@@ -2455,13 +2388,6 @@ public class PinotHelixResourceManager {
TableDeletionMessage tableDeletionMessage = new
TableDeletionMessage(tableNameWithType);
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
- // Externalview can be null for newly created table, skip sending the
message
- if (_helixZkManager.getHelixDataAccessor()
-
.getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType))
== null) {
- LOGGER.warn("No delete table message sent for newly created table: {} as
the externalview is null.",
- tableNameWithType);
- return;
- }
// Infinite timeout on the recipient
int timeoutMs = -1;
int numMessagesSent = messagingService.send(recipientCriteria,
tableDeletionMessage, null, timeoutMs);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0187fb6836..0fb1b26dc3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -179,7 +180,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected abstract void doInit();
@Override
- public void start() {
+ public synchronized void start() {
_logger.info("Starting table data manager for table: {}",
_tableNameWithType);
doStart();
_logger.info("Started table data manager for table: {}",
_tableNameWithType);
@@ -188,7 +189,11 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected abstract void doStart();
@Override
- public void shutDown() {
+ public synchronized void shutDown() {
+ if (_shutDown) {
+ _logger.info("Table data manager for table: {} is already shut down",
_tableNameWithType);
+ return;
+ }
_logger.info("Shutting down table data manager for table: {}",
_tableNameWithType);
_shutDown = true;
doShutdown();
@@ -197,6 +202,18 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected abstract void doShutdown();
+ /**
+ * Releases and removes all segments tracked by the table data manager.
+ */
+ protected void releaseAndRemoveAllSegments() {
+ Iterator<SegmentDataManager> iterator =
_segmentDataManagerMap.values().iterator();
+ while (iterator.hasNext()) {
+ SegmentDataManager segmentDataManager = iterator.next();
+ iterator.remove();
+ releaseSegment(segmentDataManager);
+ }
+ }
+
@Override
public boolean isShutDown() {
return _shutDown;
@@ -214,6 +231,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public void addSegment(ImmutableSegment immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
+ Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
+ segmentName, _tableNameWithType);
_logger.info("Adding immutable segment: {} to table: {}", segmentName,
_tableNameWithType);
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.DOCUMENT_COUNT,
immutableSegment.getSegmentMetadata().getTotalDocs());
@@ -232,6 +251,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
+ Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
+ indexDir.getName(), _tableNameWithType);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
indexLoadingConfig.getSchema()));
@@ -251,6 +272,12 @@ public abstract class BaseTableDataManager implements
TableDataManager {
*/
@Override
public void removeSegment(String segmentName) {
+ // Allow removing segment after shutdown so that we can remove the segment
when the table is deleted
+ if (_shutDown) {
+ _logger.info("Table data manager is already shut down, skip removing
segment: {} from table: {}", segmentName,
+ _tableNameWithType);
+ return;
+ }
_logger.info("Removing segment: {} from table: {}", segmentName,
_tableNameWithType);
SegmentDataManager segmentDataManager = unregisterSegment(segmentName);
if (segmentDataManager != null) {
@@ -364,6 +391,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public void reloadSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata zkMetadata,
SegmentMetadata localMetadata, @Nullable Schema schema, boolean
forceDownload)
throws Exception {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot reload segment: %s of
table: %s", segmentName,
+ _tableNameWithType);
String segmentTier = getSegmentCurrentTier(segmentName);
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
@@ -390,8 +420,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
// We should first try to reuse existing segment directory
- if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier,
segmentDirectory, indexLoadingConfig,
- schema)) {
+ if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier,
segmentDirectory, indexLoadingConfig, schema)) {
LOGGER.info("Reloading segment: {} of table: {} using existing
segment directory as no reprocessing needed",
segmentName, _tableNameWithType);
// No reprocessing needed, reuse the same segment
@@ -432,9 +461,8 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
- private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata
segmentZKMetadata,
- String currentSegmentTier, SegmentDirectory segmentDirectory,
IndexLoadingConfig indexLoadingConfig,
- Schema schema)
+ private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata
segmentZKMetadata, String currentSegmentTier,
+ SegmentDirectory segmentDirectory, IndexLoadingConfig
indexLoadingConfig, Schema schema)
throws Exception {
SegmentDirectoryLoader segmentDirectoryLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
@@ -446,6 +474,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
throws Exception {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot add/replace segment:
%s of table: %s", segmentName,
+ _tableNameWithType);
if (localMetadata != null && hasSameCRC(zkMetadata, localMetadata)) {
LOGGER.info("Segment: {} of table: {} has crc: {} same as before,
already loaded, do nothing", segmentName,
_tableNameWithType, localMetadata.getCrc());
@@ -595,12 +626,13 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
// not thread safe. Caller should invoke it with safe concurrency control.
- protected void downloadFromPeersWithoutStreaming(String segmentName,
SegmentZKMetadata zkMetadata,
- File destTarFile) throws Exception {
+ protected void downloadFromPeersWithoutStreaming(String segmentName,
SegmentZKMetadata zkMetadata, File destTarFile)
+ throws Exception {
Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme()
!= null,
- "Download peers require non null peer download scheme");
- List<URI> peerSegmentURIs =
PeerServerSegmentFinder.getPeerServerURIs(segmentName,
- _tableDataManagerConfig.getTablePeerDownloadScheme(), _helixManager,
_tableNameWithType);
+ "Download peers require non null peer download scheme");
+ List<URI> peerSegmentURIs =
+ PeerServerSegmentFinder.getPeerServerURIs(segmentName,
_tableDataManagerConfig.getTablePeerDownloadScheme(),
+ _helixManager, _tableNameWithType);
if (peerSegmentURIs.isEmpty()) {
String msg = String.format("segment %s doesn't have any peers",
segmentName);
LOGGER.warn(msg);
@@ -611,10 +643,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
// Next download the segment from a randomly chosen server using
configured scheme.
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(peerSegmentURIs,
destTarFile, zkMetadata.getCrypterName());
LOGGER.info("Fetched segment {} from peers: {} to: {} of size: {}",
segmentName, peerSegmentURIs, destTarFile,
- destTarFile.length());
+ destTarFile.length());
} catch (AttemptsExceededException e) {
LOGGER.error("Attempts exceeded when downloading segment: {} for table:
{} from peers {} to: {}", segmentName,
- _tableNameWithType, peerSegmentURIs, destTarFile);
+ _tableNameWithType, peerSegmentURIs, destTarFile);
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1L);
throw e;
}
@@ -636,12 +668,12 @@ public abstract class BaseTableDataManager implements
TableDataManager {
String uri = zkMetadata.getDownloadUrl();
AtomicInteger attempts = new AtomicInteger(0);
try {
- File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri,
tempRootDir, maxStreamRateInByte, attempts);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
- attempts.get());
- LOGGER.info("Downloaded and untarred segment: {} for table: {} from:
{} attempts: {}", segmentName,
- _tableNameWithType, uri, attempts.get());
- return ret;
+ File ret = SegmentFetcherFactory.fetchAndStreamUntarToLocal(uri,
tempRootDir, maxStreamRateInByte, attempts);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
+ attempts.get());
+ LOGGER.info("Downloaded and untarred segment: {} for table: {} from: {}
attempts: {}", segmentName,
+ _tableNameWithType, uri, attempts.get());
+ return ret;
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.SEGMENT_STREAMED_DOWNLOAD_UNTAR_FAILURES,
attempts.get());
@@ -771,6 +803,10 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public boolean tryLoadExistingSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata) {
+ Preconditions.checkState(!_shutDown,
+ "Table data manager is already shut down, cannot load existing
segment: %s of table: %s", segmentName,
+ _tableNameWithType);
+
// Try to recover the segment from potential segment reloading failure.
String segmentTier = zkMetadata.getTier();
File indexDir = getSegmentDataDir(segmentName, segmentTier,
indexLoadingConfig.getTableConfig());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index bbc392d4d0..ffb5923411 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -74,12 +74,6 @@ public interface InstanceDataManager {
void deleteTable(String tableNameWithType)
throws Exception;
- /**
- * Adds a segment from local disk into an OFFLINE table.
- */
- void addOfflineSegment(String offlineTableName, String segmentName, File
indexDir)
- throws Exception;
-
/**
* Adds a segment into an REALTIME table.
* <p>The segment might be committed or under consuming.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index a9635f2a66..b78874a8e3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -117,8 +117,10 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
@Override
public void addSegment(ImmutableSegment immutableSegment) {
- super.addSegment(immutableSegment);
String segmentName = immutableSegment.getSegmentName();
+ Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
+ segmentName, _tableNameWithType);
+ super.addSegment(immutableSegment);
try {
if (loadLookupTable()) {
_logger.info("Successfully loaded lookup table after adding segment:
{}", segmentName);
@@ -134,6 +136,11 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
@Override
public void removeSegment(String segmentName) {
+ // Allow removing segment after shutdown so that we can remove the segment
when the table is deleted
+ if (_shutDown) {
+ _logger.info("Table data manager is already shut down, skip removing
segment: {}", segmentName);
+ return;
+ }
super.removeSegment(segmentName);
try {
if (loadLookupTable()) {
@@ -150,6 +157,7 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
@Override
protected void doShutdown() {
+ releaseAndRemoveAllSegments();
closeDimensionTable(_dimensionTable.get());
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index 7e17da42cb..aef0d80b9b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -38,5 +38,6 @@ public class OfflineTableDataManager extends
BaseTableDataManager {
@Override
protected void doShutdown() {
+ releaseAndRemoveAllSegments();
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 3bf7caf24b..ebd5aee4f2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -54,7 +54,6 @@ import
org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -251,18 +250,14 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
if (_tableUpsertMetadataManager != null) {
// Stop the upsert metadata manager first to prevent removing metadata
when destroying segments
_tableUpsertMetadataManager.stop();
- for (SegmentDataManager segmentDataManager :
_segmentDataManagerMap.values()) {
- segmentDataManager.destroy();
- }
+ releaseAndRemoveAllSegments();
try {
_tableUpsertMetadataManager.close();
} catch (IOException e) {
_logger.warn("Cannot close upsert metadata manager properly for table:
{}", _tableNameWithType, e);
}
} else {
- for (SegmentDataManager segmentDataManager :
_segmentDataManagerMap.values()) {
- segmentDataManager.destroy();
- }
+ releaseAndRemoveAllSegments();
}
if (_leaseExtender != null) {
_leaseExtender.shutDown();
@@ -382,6 +377,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
public void addSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig, SegmentZKMetadata segmentZKMetadata)
throws Exception {
+ Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
+ segmentName, _tableNameWithType);
SegmentDataManager segmentDataManager =
_segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null) {
_logger.warn("Skipping adding existing segment: {} for table: {} with
data manager class: {}", segmentName,
@@ -438,8 +435,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
int partitionGroupId = llcSegmentName.getPartitionGroupId();
Semaphore semaphore =
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new
Semaphore(1));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager != null ?
_tableUpsertMetadataManager.getOrCreatePartitionManager(
- partitionGroupId) : null;
+ _tableUpsertMetadataManager != null ?
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId)
+ : null;
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
@@ -488,6 +485,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
public void addSegment(ImmutableSegment immutableSegment) {
+ String segmentName = immutableSegment.getSegmentName();
+ Preconditions.checkState(!_shutDown, "Table data manager is already shut
down, cannot add segment: %s to table: %s",
+ segmentName, _tableNameWithType);
if (isUpsertEnabled()) {
handleUpsert(immutableSegment);
return;
@@ -648,17 +648,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
- /**
- * Replaces a committed HLC REALTIME segment.
- */
- public void replaceHLSegment(SegmentZKMetadata segmentZKMetadata,
IndexLoadingConfig indexLoadingConfig)
- throws Exception {
- ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
_tableNameWithType, segmentZKMetadata);
- File indexDir = new File(_indexDir, segmentZKMetadata.getSegmentName());
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
- }
-
/**
* Replaces a committed LLC REALTIME segment.
*/
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index da466e4e93..408558d7e5 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -591,8 +592,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
*/
protected List<File> unpackTarData(String tarFileName, File outputDir)
throws Exception {
- InputStream inputStream =
-
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
+ InputStream inputStream =
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
Assert.assertNotNull(inputStream);
return TarGzCompressionUtils.untar(inputStream, outputDir);
}
@@ -618,9 +618,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
- producer =
-
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
- getDefaultKafkaProducerProperties(kafkaBroker));
+ producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+ getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic,
partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
@@ -635,9 +634,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
- producer =
-
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
- getDefaultKafkaProducerProperties(kafkaBroker));
+ producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+ getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic,
partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
@@ -734,11 +732,24 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String
tableName) {
final long countStarResult = getCountStarResult();
- TestUtils.waitForCondition(
- () -> getCurrentCountStarResult(tableName) == countStarResult, 100L,
timeoutMs,
+ TestUtils.waitForCondition(() -> getCurrentCountStarResult(tableName) ==
countStarResult, 100L, timeoutMs,
"Failed to load " + countStarResult + " documents", raiseError,
Duration.ofMillis(timeoutMs / 10));
}
+ /**
+ * Wait for servers to remove the table data manager after the table is
deleted.
+ */
+ protected void waitForTableDataManagerRemoved(String tableNameWithType) {
+ TestUtils.waitForCondition(aVoid -> {
+ for (BaseServerStarter serverStarter : _serverStarters) {
+ if
(serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
!= null) {
+ return false;
+ }
+ }
+ return true;
+ }, 60_000L, "Failed to remove table data manager for table: " +
tableNameWithType);
+ }
+
/**
* Reset table utils.
*/
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index ab79381b05..e850b4ba2b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -36,7 +36,6 @@ import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
-import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -82,25 +81,6 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
return DEFAULT_NUM_QUERIES_TO_GENERATE;
}
- /**
- * Test server table data manager deletion after the table is dropped
- */
- protected void cleanupTestTableDataManager(String tableNameWithType) {
- TestUtils.waitForCondition(aVoid -> {
- try {
- for (BaseServerStarter serverStarter : _serverStarters) {
- if
(serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
- != null) {
- return false;
- }
- }
- return true;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 600_000L, "Failed to delete table data managers");
- }
-
/**
* Test hard-coded queries.
* @throws Exception
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index c95d5d7f2c..83adb4dfe9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -180,7 +180,7 @@ public abstract class BaseRealtimeClusterIntegrationTest
extends BaseClusterInte
public void tearDown()
throws Exception {
dropRealtimeTable(getTableName());
-
cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
stopServer();
stopBroker();
stopController();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index e1ad0880c6..0debb9d09a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -432,7 +432,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
waitForNumOfSegmentsBecomeOnline(offlineTableName, 1);
dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
- cleanupTestTableDataManager(offlineTableName);
+ waitForTableDataManagerRemoved(offlineTableName);
}
private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int
numSegments)
@@ -2386,15 +2386,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
for (int i = 2; i < 20; i++) {
query = String.format("SELECT distinctCountHLL(FlightNum, %d) FROM
mytable ", i);
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 2]);
-
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
- expectedResults[i - 2]);
+
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[i - 2]);
}
// Default HLL is set as log2m=12
query = "SELECT distinctCountHLL(FlightNum) FROM mytable ";
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[10]);
-
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
- expectedResults[10]);
+
assertEquals(postQuery(query).get("resultTable").get("rows").get(0).get(0).asLong(),
expectedResults[10]);
}
@Test
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 45965f4bc4..2de9c586c0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -61,8 +61,8 @@ public interface TableDataManager {
void start();
/**
- * Shuts down the table data manager. Should be called only once. After
calling shut down, no other method should be
- * called.
+ * Shuts down the table data manager. After calling shut down, no other
method should be called.
+ * NOTE: Shut down might be called multiple times. The implementation should
be able to handle that.
*/
void shutDown();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index bdc4baf58d..d5c0c8afd8 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
@@ -42,7 +41,9 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -213,26 +214,6 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
LOGGER.info("Helix instance data manager shut down");
}
- @Override
- public void addOfflineSegment(String offlineTableName, String segmentName,
File indexDir)
- throws Exception {
- LOGGER.info("Adding segment: {} to table: {}", segmentName,
offlineTableName);
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", offlineTableName);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
- SegmentZKMetadata zkMetadata =
- ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
offlineTableName, segmentName);
- Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for offline segment: %s, table: %s",
- segmentName, offlineTableName);
-
- IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
- indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
-
- _tableDataManagerMap.computeIfAbsent(offlineTableName, k ->
createTableDataManager(k, tableConfig))
- .addSegment(indexDir, indexLoadingConfig);
- LOGGER.info("Added segment: {} to table: {}", segmentName,
offlineTableName);
- }
-
@Override
public void addRealtimeSegment(String realtimeTableName, String segmentName)
throws Exception {
@@ -264,28 +245,43 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Override
public void deleteTable(String tableNameWithType)
throws Exception {
- // Wait externalview to converge
- long endTimeMs = System.currentTimeMillis() +
_externalViewDroppedMaxWaitMs;
- do {
- ExternalView externalView = _helixManager.getHelixDataAccessor()
-
.getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType));
- if (externalView == null) {
- LOGGER.info("ExternalView converged for the table to delete: {}",
tableNameWithType);
- _tableDataManagerMap.compute(tableNameWithType, (k, v) -> {
- if (v != null) {
- v.shutDown();
- LOGGER.info("Removed table: {}", tableNameWithType);
- } else {
- LOGGER.warn("Failed to find table data manager for table: {}, skip
removing the table", tableNameWithType);
- }
- return null;
- });
- return;
- }
- Thread.sleep(_externalViewDroppedCheckInternalMs);
- } while (System.currentTimeMillis() < endTimeMs);
- throw new TimeoutException(
- "Timeout while waiting for ExternalView to converge for the table to
delete: " + tableNameWithType);
+ TableDataManager tableDataManager =
_tableDataManagerMap.get(tableNameWithType);
+ if (tableDataManager == null) {
+ LOGGER.warn("Failed to find table data manager for table: {}, skip
deleting the table", tableNameWithType);
+ return;
+ }
+ LOGGER.info("Shutting down table data manager for table: {}",
tableNameWithType);
+ tableDataManager.shutDown();
+ LOGGER.info("Finished shutting down table data manager for table: {}",
tableNameWithType);
+
+ try {
+ // Wait for external view to disappear or become empty before removing
the table data manager.
+ //
+ // When creating the table, controller will check whether the external
view exists, and allow table creation only
+ // if it doesn't exist. If the table is recreated just after external
view disappeared, there is a small chance
+ // that server won't realize the external view is removed because it is
recreated before the server checks it. In
+ // order to handle this scenario, we want to remove the table data
manager when the external view exists but is
+ // empty.
+ HelixDataAccessor helixDataAccessor =
_helixManager.getHelixDataAccessor();
+ PropertyKey externalViewKey =
helixDataAccessor.keyBuilder().externalView(tableNameWithType);
+ long endTimeMs = System.currentTimeMillis() +
_externalViewDroppedMaxWaitMs;
+ do {
+ ExternalView externalView =
helixDataAccessor.getProperty(externalViewKey);
+ if (externalView == null) {
+ LOGGER.info("ExternalView is dropped for table: {}",
tableNameWithType);
+ return;
+ }
+ if (externalView.getRecord().getMapFields().isEmpty()) {
+ LOGGER.info("ExternalView is empty for table: {}",
tableNameWithType);
+ return;
+ }
+ Thread.sleep(_externalViewDroppedCheckInternalMs);
+ } while (System.currentTimeMillis() < endTimeMs);
+ LOGGER.warn("ExternalView still exists after {}ms for table: {}",
_externalViewDroppedMaxWaitMs,
+ tableNameWithType);
+ } finally {
+ _tableDataManagerMap.remove(tableNameWithType);
+ }
}
@Override
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index b85d13b953..510f4fa74d 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -29,8 +29,6 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import
org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -75,49 +73,61 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
@Transition(from = "OFFLINE", to = "CONSUMING")
public void onBecomeConsumingFromOffline(Message message,
NotificationContext context) {
-
Preconditions.checkState(SegmentName.isLowLevelConsumerSegmentName(message.getPartitionName()),
- "Tried to go into CONSUMING state on non-low level segment");
_logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : "
+ message);
- // We do the same processing as usual for going to the consuming state,
which adds the segment to the table data
- // manager and starts Kafka consumption
- onBecomeOnlineFromOffline(message, context);
+ String realtimeTableName = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.addRealtimeSegment(realtimeTableName,
segmentName);
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Caught exception in state transition OFFLINE ->
CONSUMING for table: %s, segment: %s",
+ realtimeTableName, segmentName);
+ _logger.error(errorMessage, e);
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
+ if (tableDataManager != null) {
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage,
e));
+ }
+ Utils.rethrowException(e);
+ }
}
@Transition(from = "CONSUMING", to = "ONLINE")
public void onBecomeOnlineFromConsuming(Message message,
NotificationContext context) {
+
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : "
+ message);
String realtimeTableName = message.getResourceName();
- String segmentNameStr = message.getPartitionName();
- LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-
+ String segmentName = message.getPartitionName();
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
- Preconditions.checkNotNull(tableDataManager);
- tableDataManager.onConsumingToOnline(segmentNameStr);
- SegmentDataManager acquiredSegment =
tableDataManager.acquireSegment(segmentNameStr);
+ Preconditions.checkState(tableDataManager != null, "Failed to find
table: %s", realtimeTableName);
+ tableDataManager.onConsumingToOnline(segmentName);
+ SegmentDataManager acquiredSegment =
tableDataManager.acquireSegment(segmentName);
// For this transition to be correct in helix, we should already have a
segment that is consuming
- if (acquiredSegment == null) {
- throw new RuntimeException("Segment " + segmentNameStr + " + not
present ");
- }
+ Preconditions.checkState(acquiredSegment != null, "Failed to find
segment: %s in table: %s", segmentName,
+ realtimeTableName);
// TODO: https://github.com/apache/pinot/issues/10049
try {
if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) {
// We found an LLC segment that is not consuming right now, must be
that we already swapped it with a
// segment that has been built. Nothing to do for this state
transition.
- _logger
- .info("Segment {} not an instance of
LLRealtimeSegmentDataManager. Reporting success for the transition",
- acquiredSegment.getSegmentName());
+ _logger.info(
+ "Segment {} not an instance of LLRealtimeSegmentDataManager.
Reporting success for the transition",
+ acquiredSegment.getSegmentName());
return;
}
LLRealtimeSegmentDataManager segmentDataManager =
(LLRealtimeSegmentDataManager) acquiredSegment;
- SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
- .getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
realtimeTableName, segmentNameStr);
+ SegmentZKMetadata segmentZKMetadata =
+
ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(),
realtimeTableName,
+ segmentName);
segmentDataManager.goOnlineFromConsuming(segmentZKMetadata);
- } catch (InterruptedException e) {
- String errorMessage = String.format("State transition interrupted for
segment %s.", segmentNameStr);
- _logger.warn(errorMessage, e);
- tableDataManager
- .addSegmentError(segmentNameStr, new
SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ String errorMessage =
+ String.format("Caught exception in state transition CONSUMING ->
ONLINE for table: %s, segment: %s",
+ realtimeTableName, segmentName);
+ _logger.error(errorMessage, e);
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ Utils.rethrowException(e);
} finally {
tableDataManager.releaseSegment(acquiredSegment);
}
@@ -131,7 +141,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
try {
_instanceDataManager.offloadSegment(realtimeTableName, segmentName);
} catch (Exception e) {
- _logger.error("Caught exception in state transition from CONSUMING ->
OFFLINE for resource: {}, partition: {}",
+ _logger.error("Caught exception in state transition CONSUMING ->
OFFLINE for table: {}, segment: {}",
realtimeTableName, segmentName, e);
Utils.rethrowException(e);
}
@@ -141,15 +151,16 @@ public class SegmentOnlineOfflineStateModelFactory
extends StateModelFactory<Sta
public void onBecomeDroppedFromConsuming(Message message,
NotificationContext context) {
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : "
+ message);
String realtimeTableName = message.getResourceName();
- String segmentNameStr = message.getPartitionName();
+ String segmentName = message.getPartitionName();
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
- Preconditions.checkNotNull(tableDataManager);
- tableDataManager.onConsumingToDropped(segmentNameStr);
+ Preconditions.checkState(tableDataManager != null, "Failed to find
table: %s", realtimeTableName);
+ tableDataManager.onConsumingToDropped(segmentName);
try {
- onBecomeOfflineFromConsuming(message, context);
- onBecomeDroppedFromOffline(message, context);
- } catch (final Exception e) {
- _logger.error("Caught exception on CONSUMING -> DROPPED state
transition", e);
+ _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+ _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition CONSUMING ->
DROPPED for table: {}, segment: {}",
+ realtimeTableName, segmentName, e);
Utils.rethrowException(e);
}
}
@@ -160,7 +171,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
String tableNameWithType = message.getResourceName();
String segmentName = message.getPartitionName();
try {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(message.getResourceName());
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Preconditions.checkNotNull(tableType);
if (tableType == TableType.OFFLINE) {
_instanceDataManager.addOrReplaceSegment(tableNameWithType,
segmentName);
@@ -168,14 +179,14 @@ public class SegmentOnlineOfflineStateModelFactory
extends StateModelFactory<Sta
_instanceDataManager.addRealtimeSegment(tableNameWithType,
segmentName);
}
} catch (Exception e) {
- String errorMessage = String
- .format("Caught exception in state transition from OFFLINE ->
ONLINE for resource: %s, partition: %s",
+ String errorMessage =
+ String.format("Caught exception in state transition OFFLINE ->
ONLINE for table: %s, segment: %s",
tableNameWithType, segmentName);
_logger.error(errorMessage, e);
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager != null) {
- tableDataManager
- .addSegmentError(segmentName, new
SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ tableDataManager.addSegmentError(segmentName,
+ new SegmentErrorInfo(System.currentTimeMillis(), errorMessage,
e));
}
Utils.rethrowException(e);
}
@@ -191,7 +202,7 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
try {
_instanceDataManager.offloadSegment(tableNameWithType, segmentName);
} catch (Exception e) {
- _logger.error("Caught exception in state transition from ONLINE ->
OFFLINE for resource: {}, partition: {}",
+ _logger.error("Caught exception in state transition ONLINE -> OFFLINE
for table: {}, segment: {}",
tableNameWithType, segmentName, e);
Utils.rethrowException(e);
}
@@ -205,8 +216,9 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
String segmentName = message.getPartitionName();
try {
_instanceDataManager.deleteSegment(tableNameWithType, segmentName);
- } catch (final Exception e) {
- _logger.error("Cannot drop the segment : " + segmentName + " from
server!\n" + e.getMessage(), e);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition OFFLINE -> DROPPED
for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
Utils.rethrowException(e);
}
}
@@ -214,18 +226,35 @@ public class SegmentOnlineOfflineStateModelFactory
extends StateModelFactory<Sta
@Transition(from = "ONLINE", to = "DROPPED")
public void onBecomeDroppedFromOnline(Message message, NotificationContext
context) {
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline()
: " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
try {
- onBecomeOfflineFromOnline(message, context);
- onBecomeDroppedFromOffline(message, context);
- } catch (final Exception e) {
- _logger.error("Caught exception on ONLINE -> DROPPED state
transition", e);
+ _instanceDataManager.offloadSegment(tableNameWithType, segmentName);
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ONLINE -> DROPPED
for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
Utils.rethrowException(e);
}
}
@Transition(from = "ERROR", to = "OFFLINE")
public void onBecomeOfflineFromError(Message message, NotificationContext
context) {
- _logger.info("Resetting the state for segment:{} from ERROR to OFFLINE",
message.getPartitionName());
+ _logger.info("SegmentOnlineOfflineStateModel.onBecomeOfflineFromError()
: " + message);
+ }
+
+ @Transition(from = "ERROR", to = "DROPPED")
+ public void onBecomeDroppedFromError(Message message, NotificationContext
context) {
+ _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromError()
: " + message);
+ String tableNameWithType = message.getResourceName();
+ String segmentName = message.getPartitionName();
+ try {
+ _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+ } catch (Exception e) {
+ _logger.error("Caught exception in state transition ERROR -> DROPPED
for table: {}, segment: {}",
+ tableNameWithType, segmentName, e);
+ Utils.rethrowException(e);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]