Jackie-Jiang commented on code in PR #9306:
URL: https://github.com/apache/pinot/pull/9306#discussion_r962069418
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java:
##########
@@ -47,6 +51,28 @@ public static boolean shouldRelocateToTiers(TableConfig
tableConfig) {
return CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList());
}
+ public static String getDataDirFromTierConfig(String tableNameWithType,
String tierName, TableConfig tableConfig) {
+ List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
+ Preconditions
+ .checkState(tierCfgs != null && !tierCfgs.isEmpty(), "No tierConfigs
for table: %s", tableNameWithType);
Review Comment:
(minor) `CollectionUtils.isNotEmpty(tierCfgs)`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/MultiDirSegmentDirectoryLoader.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.loader;
+
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.config.TierConfigUtils;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentLoader;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of {@link SegmentDirectoryLoader} that can move segments
across data dirs configured as storage tiers.
+ */
+@SegmentLoader(name = "multidir")
+public class MultiDirSegmentDirectoryLoader implements SegmentDirectoryLoader {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiDirSegmentDirectoryLoader.class);
+
+ /**
+ * Creates and loads the {@link SegmentLocalFSDirectory} which is the
default implementation of
+ * {@link SegmentDirectory}
+ * @param indexDir the current segment index directory
+ * @param segmentLoaderContext context for instantiation of the
SegmentDirectory. The target tier set in context is
+ * used to decide which data directory to keep
the segment data.
+ * @return instance of {@link SegmentLocalFSDirectory}
+ */
+ @Override
+ public SegmentDirectory load(URI indexDir, SegmentDirectoryLoaderContext
segmentLoaderContext)
+ throws Exception {
+ File srcDir = new File(indexDir);
+ String segmentName = segmentLoaderContext.getSegmentName();
+ String segmentTier = segmentLoaderContext.getSegmentTier();
+ File destDir = getTargetDataDir(segmentTier, segmentLoaderContext);
+ if (destDir == null) {
+ destDir = getDefaultDataDir(segmentLoaderContext);
+ LOGGER.info("Use destDir: {} on default tier for segment: {} as no
dataDir for target tier: {}", destDir,
+ segmentName, segmentTier);
+ segmentTier = null;
+ }
+ if (srcDir.equals(destDir)) {
+ LOGGER.info("Keep segment: {} in current dataDir: {} on current tier:
{}", segmentName, destDir, segmentTier);
+ } else {
+ LOGGER.info("Move segment: {} from srcDir: {} to destDir: {} on tier:
{}", segmentName, srcDir, destDir,
+ segmentTier);
+ if (destDir.exists()) {
+ LOGGER.warn("The destDir: {} exists on tier: {} and cleans it
firstly", destDir, segmentTier);
+ FileUtils.deleteQuietly(destDir);
+ }
+ FileUtils.moveDirectory(srcDir, destDir);
Review Comment:
Will this cause exception when `srcDir` does not exist?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -546,6 +553,31 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
+ @VisibleForTesting
+ File getSegmentDataDir(String segmentName, String segmentTier,
IndexLoadingConfig indexLoadingConfig) {
Review Comment:
(minor)
```suggestion
File getSegmentDataDir(String segmentName, @Nullable String segmentTier,
IndexLoadingConfig indexLoadingConfig) {
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -642,44 +675,44 @@ private boolean tryLoadExistingSegment(String
segmentName, IndexLoadingConfig in
// Close the stale SegmentDirectory object and recreate it with
reprocessed segment.
closeSegmentDirectoryQuietly(segmentDirectory);
ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig,
schema);
- segmentDirectory =
- initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig, schema);
+ segmentDirectory = initSegmentDirectory(segmentName,
String.valueOf(zkMetadata.getCrc()), zkMetadata.getTier(),
+ indexLoadingConfig, schema);
}
ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory,
indexLoadingConfig, schema);
addSegment(segment);
- LOGGER.info("Loaded existing segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
- zkMetadata.getCrc());
+ LOGGER.info("Loaded existing segment: {} of table: {} with crc: {} on
tier: {}", segmentName, _tableNameWithType,
+ zkMetadata.getCrc(), zkMetadata.getTier());
return true;
} catch (Exception e) {
- LOGGER.error("Failed to load existing segment: {} of table: {} with crc:
{}", segmentName, _tableNameWithType,
- zkMetadata.getCrc(), e);
+ LOGGER.error("Failed to load existing segment: {} of table: {} with crc:
{} on tier: {}", segmentName,
+ _tableNameWithType, zkMetadata.getCrc(), zkMetadata.getTier(), e);
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
}
}
- private SegmentDirectory tryInitSegmentDirectory(String segmentName, String
segmentCrc,
+ private SegmentDirectory tryInitSegmentDirectory(String segmentName, String
segmentCrc, String segmentTier,
IndexLoadingConfig indexLoadingConfig, Schema schema) {
try {
- return initSegmentDirectory(segmentName, segmentCrc, indexLoadingConfig,
schema);
+ return initSegmentDirectory(segmentName, segmentCrc, segmentTier,
indexLoadingConfig, schema);
} catch (Exception e) {
- LOGGER.warn("Failed to initialize SegmentDirectory for segment: {} of
table: {} with error: {}", segmentName,
- _tableNameWithType, e.getMessage());
+ LOGGER.warn("Failed to initialize SegmentDirectory for segment: {} of
table: {} on tier: {} with error: {}",
+ segmentName, _tableNameWithType, segmentTier, e.getMessage());
return null;
}
}
- private SegmentDirectory initSegmentDirectory(String segmentName, String
segmentCrc,
+ private SegmentDirectory initSegmentDirectory(String segmentName, String
segmentCrc, String segmentTier,
IndexLoadingConfig indexLoadingConfig, Schema schema)
throws Exception {
SegmentDirectoryLoaderContext loaderContext =
new
SegmentDirectoryLoaderContext.Builder().setTableConfig(indexLoadingConfig.getTableConfig())
-
.setSchema(schema).setInstanceId(indexLoadingConfig.getInstanceId()).setSegmentName(segmentName)
-
.setSegmentCrc(segmentCrc).setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs())
- .build();
+
.setSchema(schema).setInstanceId(indexLoadingConfig.getInstanceId()).setTableDataDir(_tableDataDir)
+
.setSegmentName(segmentName).setSegmentCrc(segmentCrc).setSegmentTier(segmentTier)
+
.setSegmentDirectoryConfigs(indexLoadingConfig.getSegmentDirectoryConfigs()).build();
SegmentDirectoryLoader segmentDirectoryLoader =
SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
- File indexDir = getSegmentDataDir(segmentName);
+ File indexDir = getSegmentDataDir(segmentName, segmentTier,
indexLoadingConfig);
return segmentDirectoryLoader.load(indexDir.toURI(), loaderContext);
Review Comment:
I'm I missing something? The source dir here will be the final dir after
applying the tier, but the file might not be moved yet (the move logic is
within the load). Won't this cause chicken and egg problem?
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/loader/SegmentDirectoryLoaderContext.java:
##########
@@ -75,6 +87,8 @@ public static class Builder {
private String _instanceId;
private String _segmentName;
private String _segmentCrc;
+ private String _segmentTier;
+ private String _tableDataDir;
Review Comment:
(minor) Keep the same sequence as the constructor
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/loader/MultiDirSegmentDirectoryLoader.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.loader;
+
+import java.io.File;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.config.TierConfigUtils;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
+import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.loader.SegmentLoader;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of {@link SegmentDirectoryLoader} that can move segments
across data dirs configured as storage tiers.
+ */
+@SegmentLoader(name = "multidir")
+public class MultiDirSegmentDirectoryLoader implements SegmentDirectoryLoader {
Review Comment:
Should we consider calling it `TierBasedSegmentDirectoryLoader`
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -406,9 +410,12 @@ public void addOrReplaceSegment(String segmentName,
IndexLoadingConfig indexLoad
localMetadata.getCrc(), zkMetadata.getCrc());
}
File indexDir = downloadSegment(segmentName, zkMetadata);
- addSegment(indexDir, indexLoadingConfig);
- LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}",
segmentName, _tableNameWithType,
- zkMetadata.getCrc());
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
Review Comment:
We might want to remove the `addSegment(indexDir, indexLoadingConfig)` since
it doesn't have all the required arguments, and can cause unexpected behavior
when people use it
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -546,6 +553,31 @@ File getSegmentDataDir(String segmentName) {
return new File(_indexDir, segmentName);
}
+ @VisibleForTesting
+ File getSegmentDataDir(String segmentName, String segmentTier,
IndexLoadingConfig indexLoadingConfig) {
+ if (segmentTier == null) {
+ return getSegmentDataDir(segmentName);
+ }
+ try {
+ String tierDataDir = TierConfigUtils
+ .getDataDirFromTierConfig(_tableNameWithType, segmentTier,
indexLoadingConfig.getTableConfig());
+ File tierTableDataDir = new File(tierDataDir, _tableNameWithType);
+ return new File(tierTableDataDir, segmentName);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get dataDir for segment: {} of table: {} on tier:
{} due to error: {}", segmentName,
+ _tableNameWithType, segmentTier, e.getMessage());
+ return getSegmentDataDir(segmentName);
+ }
+ }
+
+ private String getSegmentCurrentTier(String segmentName) {
Review Comment:
(minor) Annotate the return as `Nullable`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]