This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1e58df7713 [bugfix] Do not move real-time segments to working dir on
restart (#11226)
1e58df7713 is described below
commit 1e58df771379e009e51518231462da8a8540129e
Author: Alexey Pavlenko <[email protected]>
AuthorDate: Wed Aug 2 07:43:47 2023 +0300
[bugfix] Do not move real-time segments to working dir on restart (#11226)
---
.../core/data/manager/BaseTableDataManager.java | 1 +
.../manager/realtime/RealtimeTableDataManager.java | 5 ++
.../realtime/RealtimeTableDataManagerTest.java | 53 ++++++++++++++++++++++
.../starter/helix/HelixInstanceDataManager.java | 10 +++-
4 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 04059f94c1..0187fb6836 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -233,6 +233,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
throws Exception {
indexLoadingConfig.setTableDataDir(_tableDataDir);
+
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
indexLoadingConfig.getSchema()));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 41c338d209..f431b63eb9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -390,6 +390,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return;
}
+ // Assign table directory and tier info to not let the segment be moved
during loading/preprocessing
+ indexLoadingConfig.setTableDataDir(_tableDataDir);
+
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
+ indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());
+
File segmentDir = new File(_indexDir, segmentName);
// Restart during segment reload might leave segment in inconsistent state
(index directory might not exist but
// segment backup directory existed), need to first try to recover from
reload failure before checking the existence
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index 45196e2730..e865e44c5e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -157,6 +157,59 @@ public class RealtimeTableDataManagerTest {
assertEquals(llmd.getTotalDocs(), 5);
}
+ @Test
+ public void testAddSegmentDefaultTierByTierBasedDirLoader()
+ throws Exception {
+ RealtimeTableDataManager tmgr1 = new RealtimeTableDataManager(null);
+ TableDataManagerConfig tableDataManagerConfig =
createTableDataManagerConfig();
+ ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ TableConfig tableConfig = setupTableConfig(propertyStore);
+ Schema schema = setupSchema(propertyStore);
+ tmgr1.init(tableDataManagerConfig, "server01", propertyStore,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
+ new TableDataManagerParams(0, false, -1));
+
+ // Create a raw segment and put it in deep store backed by local fs.
+ String segName = "seg_tiered_01";
+ SegmentZKMetadata segmentZKMetadata =
+ TableDataManagerTestUtils.makeRawSegment(segName,
createSegment(tableConfig, schema, segName),
+ new File(TEMP_DIR, segName +
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
+ segmentZKMetadata.setStatus(Status.DONE);
+
+ // Local segment dir doesn't exist, thus downloading from deep store.
+ File localSegDir = new File(TABLE_DATA_DIR, segName);
+ assertFalse(localSegDir.exists());
+
+ // Add segment
+ IndexLoadingConfig indexLoadingConfig =
+ TableDataManagerTestUtils.createIndexLoadingConfig("tierBased",
tableConfig, schema);
+ tmgr1.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
+ assertTrue(localSegDir.exists());
+ SegmentMetadataImpl llmd = new SegmentMetadataImpl(new
File(TABLE_DATA_DIR, segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+
+ // Now, repeat initialization of the table data manager
+ tmgr1.shutDown();
+ RealtimeTableDataManager tmgr2 = new RealtimeTableDataManager(null);
+ tableDataManagerConfig = createTableDataManagerConfig();
+ propertyStore = mock(ZkHelixPropertyStore.class);
+ tableConfig = setupTableConfig(propertyStore);
+ schema = setupSchema(propertyStore);
+ tmgr2.init(tableDataManagerConfig, "server01", propertyStore,
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
mock(HelixManager.class), null, null,
+ new TableDataManagerParams(0, false, -1));
+
+ // Reinitialize index loading config and try adding the segment
+ indexLoadingConfig =
+ TableDataManagerTestUtils.createIndexLoadingConfig("tierBased",
tableConfig, schema);
+ tmgr2.addSegment(segName, indexLoadingConfig, segmentZKMetadata);
+
+ // Make sure that the segment hasn't been moved
+ assertTrue(localSegDir.exists());
+ llmd = new SegmentMetadataImpl(new File(TABLE_DATA_DIR, segName));
+ assertEquals(llmd.getTotalDocs(), 5);
+ }
+
@Test
public void testAllowDownload() {
RealtimeTableDataManager mgr = new RealtimeTableDataManager(null);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index f01d13ea94..bdc4baf58d 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -220,8 +220,16 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", offlineTableName);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
+ SegmentZKMetadata zkMetadata =
+ ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
offlineTableName, segmentName);
+ Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata
for offline segment: %s, table: %s",
+ segmentName, offlineTableName);
+
+ IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
+ indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
+
_tableDataManagerMap.computeIfAbsent(offlineTableName, k ->
createTableDataManager(k, tableConfig))
- .addSegment(indexDir, new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema));
+ .addSegment(indexDir, indexLoadingConfig);
LOGGER.info("Added segment: {} to table: {}", segmentName,
offlineTableName);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]