This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7af2bac942 Reload conditional skip (#10089)
7af2bac942 is described below
commit 7af2bac942c64ac16ab7be3cb5b24461574b37bc
Author: Saurabh Dubey <[email protected]>
AuthorDate: Fri Jan 27 02:51:42 2023 +0530
Reload conditional skip (#10089)
---
.../core/data/manager/BaseTableDataManager.java | 34 ++++++++++++++++++----
.../data/manager/BaseTableDataManagerTest.java | 2 --
.../loader/TierBasedSegmentDirectoryLoader.java | 6 ++++
.../segment/index/loader/SegmentPreProcessor.java | 7 +++--
.../segment/spi/loader/SegmentDirectoryLoader.java | 9 ++++++
5 files changed, 48 insertions(+), 10 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index ff5a7693f6..6ce2c3e451 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -355,14 +355,13 @@ public abstract class BaseTableDataManager implements
TableDataManager {
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
File indexDir = getSegmentDataDir(segmentName, segmentTier,
indexLoadingConfig.getTableConfig());
try {
- // Create backup directory to handle failure of segment reloading.
- createBackup(indexDir);
-
// Download segment from deep store if CRC changes or forced to download;
// otherwise, copy backup directory back to the original index directory.
// And then continue to load the segment from the index directory.
boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata,
localMetadata);
if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
if (forceDownload) {
LOGGER.info("Segment: {} of table: {} is forced to download",
segmentName, _tableNameWithType);
} else {
@@ -373,12 +372,27 @@ public abstract class BaseTableDataManager implements
TableDataManager {
} else {
LOGGER.info("Reload existing segment: {} of table: {} on tier: {}",
segmentName, _tableNameWithType,
TierConfigUtils.normalizeTierName(segmentTier));
+ SegmentDirectory segmentDirectory =
+ initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
+ // We should first try to reuse existing segment directory
+ if (canReuseExistingDirectoryForReload(zkMetadata, segmentTier,
segmentDirectory, indexLoadingConfig,
+ schema)) {
+ LOGGER.info("Reloading segment: {} of table: {} using existing
segment directory as no reprocessing needed",
+ segmentName, _tableNameWithType);
+ // No reprocessing needed, reuse the same segment
+ ImmutableSegment segment =
ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema);
+ addSegment(segment);
+ return;
+ }
+ // Create backup directory to handle failure of segment reloading.
+ createBackup(indexDir);
// The indexDir is empty after calling createBackup, as it's renamed
to a backup directory.
// The SegmentDirectory should initialize accordingly. Like for
SegmentLocalFSDirectory, it
// doesn't load anything from an empty indexDir, but gets the info to
complete the copyTo.
- try (SegmentDirectory segmentDirectory =
initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()),
- indexLoadingConfig)) {
+ try {
segmentDirectory.copyTo(indexDir);
+ } finally {
+ segmentDirectory.close();
}
}
@@ -403,6 +417,16 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
+ private boolean canReuseExistingDirectoryForReload(SegmentZKMetadata
segmentZKMetadata,
+ String currentSegmentTier, SegmentDirectory segmentDirectory,
IndexLoadingConfig indexLoadingConfig,
+ Schema schema)
+ throws Exception {
+ SegmentDirectoryLoader segmentDirectoryLoader =
+
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+ return
!segmentDirectoryLoader.needsTierMigration(segmentZKMetadata.getTier(),
currentSegmentTier)
+ && !ImmutableSegmentLoader.needPreprocess(segmentDirectory,
indexLoadingConfig, schema);
+ }
+
@Override
public void addOrReplaceSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig,
SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 6437a54a70..b4cc98cbf1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -191,7 +191,6 @@ public class BaseTableDataManagerTest {
fail();
} catch (Exception e) {
// As expected, segment reloading fails due to missing the local segment
dir.
- assertTrue(e.getMessage().contains("does not exist or is not a
directory"));
}
}
@@ -312,7 +311,6 @@ public class BaseTableDataManagerTest {
fail();
} catch (Exception e) {
// As expected, segment reloading fails due to missing the local segment
dir.
- assertTrue(e.getMessage().contains("does not exist or is not a
directory"));
}
tmgr.reloadSegment(segName,
TableDataManagerTestUtils.createIndexLoadingConfig(), zkmd, llmd, null, true);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
index 170ce710e4..e23e73df46 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/TierBasedSegmentDirectoryLoader.java
@@ -26,6 +26,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
@@ -216,4 +217,9 @@ public class TierBasedSegmentDirectoryLoader implements
SegmentDirectoryLoader {
return null;
}
}
+
+ @Override
+ public boolean needsTierMigration(String targetTier, String currentTier) {
+ return !StringUtils.equals(targetTier, currentTier);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 1f26259558..85a86baf58 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -207,9 +207,10 @@ public class SegmentPreProcessor implements AutoCloseable {
List<StarTreeV2Metadata> starTreeMetadataList =
_segmentMetadata.getStarTreeV2MetadataList();
// There are existing star-trees, but if they match the builder configs
exactly,
// then there is no need to generate the star-trees
- if (starTreeMetadataList != null && !StarTreeBuilderUtils
- .shouldRemoveExistingStarTrees(starTreeBuilderConfigs,
starTreeMetadataList)) {
- return false;
+
+ // We need reprocessing if existing configs are to be removed, or new
configs have been added
+ if (starTreeMetadataList != null) {
+ return
StarTreeBuilderUtils.shouldRemoveExistingStarTrees(starTreeBuilderConfigs,
starTreeMetadataList);
}
return !starTreeBuilderConfigs.isEmpty();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoader.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoader.java
index 591c0997b5..cf945f83f5 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoader.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoader.java
@@ -42,4 +42,13 @@ public interface SegmentDirectoryLoader {
default void delete(SegmentDirectoryLoaderContext
segmentDirectoryLoaderContext)
throws Exception {
}
+
+ /**
+ * Based on the zkMetadata's and current segment tier, checks whether or not
tier migration is needed
+ * @param targetTier segment's ZKMetadata's tier
+ * @param currentTier Current segment tier
+ */
+ default boolean needsTierMigration(String targetTier, String currentTier) {
+ return false;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]