This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 90fc9285b3 Ensure SegmentDeletionManager deletes deepstore files
created by BaseMultipleSegmentsConversionExecutor (#15048)
90fc9285b3 is described below
commit 90fc9285b3e34e231c6ba3574a4b414cf7596ac4
Author: 9aman <[email protected]>
AuthorDate: Fri Feb 14 11:01:14 2025 +0530
Ensure SegmentDeletionManager deletes deepstore files created by
BaseMultipleSegmentsConversionExecutor (#15048)
* Rely on Segment ZK download url for deleting segment in the deletion
manager and use the existing naming scheme only as a backup
* Fixing integration tests
* Changing the approach to using fixed extension instead of relying on
reading segment ZK metadata
* Minor improvements in logging
* add logs
* Improving java docs
* Fix java docs and minor logs
---
.../helix/core/SegmentDeletionManager.java | 44 +++++++++-
.../pinot/controller/api/TableViewsTest.java | 9 +-
.../PinotHelixResourceManagerStatelessTest.java | 98 +++++++++++++++-------
.../core/util/SegmentDeletionManagerTest.java | 87 +++++++++++++++++++
4 files changed, 205 insertions(+), 33 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 5a7012657e..d1d5544be7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.LeadControllerManager;
import
org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
@@ -235,7 +236,11 @@ public class SegmentDeletionManager {
long retentionMs = deletedSegmentsRetentionMs == null
? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
+ URI fileToDeleteURI = getFileToDeleteURI(rawTableName, segmentId);
+ if (fileToDeleteURI == null) {
+ LOGGER.warn("No segment file found for segment: {} in deep store,
skipping deletion", segmentId);
+ return;
+ }
PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
// Segment metadata in remote store is an optimization, to avoid
downloading segment to parse metadata.
// This is catch all clean up to ensure that metadata is removed from
deep store.
@@ -282,6 +287,43 @@ public class SegmentDeletionManager {
}
}
+ /**
+ * Retrieves the URI for segment deletion by checking two possible segment
file variants in deep store.
+ * Looks for the segment file in two formats:
+ * - Without extension (conventional naming)
+ * - With .tar.gz extension (used by minions in
BaseMultipleSegmentsConversionExecutor)
+ *
+ * @param rawTableName name of the table containing the segment
+ * @param segmentId name of the segment
+ * @return URI of the existing segment file if found in either format, null
if segment doesn't exist in either format
+ * or if there are filesystem access errors
+ */
+ @Nullable
+ private URI getFileToDeleteURI(String rawTableName, String segmentId) {
+ try {
+ URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName,
URIUtils.encode(segmentId));
+ PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme());
+
+ // Check for plain segment file first
+ if (pinotFS.exists(plainFileUri)) {
+ return plainFileUri;
+ }
+
+ URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName,
+ URIUtils.encode(segmentId +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
+
+ // Check for .tar.gz segment file
+ if (pinotFS.exists(tarGzFileUri)) {
+ return tarGzFileUri;
+ }
+ LOGGER.error("No file found for segment: {} in deep store", segmentId);
+ return null;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while trying to find file for segment: {}
in deep store", segmentId);
+ return null;
+ }
+ }
+
/**
* Removes aged deleted segments from the deleted directory
*/
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 639542eab1..7df080df26 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -22,6 +22,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
import org.apache.helix.InstanceType;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
@@ -63,7 +64,9 @@ public class TableViewsTest extends ControllerTest {
DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig);
DEFAULT_INSTANCE.getHelixResourceManager()
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME),
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
OFFLINE_SEGMENT_NAME), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
OFFLINE_SEGMENT_NAME),
+
getDownloadURL(DEFAULT_INSTANCE.getHelixResourceManager().getDataDir(),
OFFLINE_TABLE_NAME,
+ OFFLINE_SEGMENT_NAME));
// Create the hybrid table
DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME);
@@ -168,6 +171,10 @@ public class TableViewsTest extends ControllerTest {
TableViews.TableView.class);
}
+ private String getDownloadURL(String controllerDataDir, String rawTableName,
String segmentId) {
+ return URIUtils.getUri(controllerDataDir, rawTableName,
URIUtils.encode(segmentId)).toString();
+ }
+
@AfterClass
public void tearDown() {
DEFAULT_INSTANCE.cleanup();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 095865d923..dda542d3d2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -983,9 +984,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"), null)));
// Try after new segments added to the table
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s20"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s20"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s20"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s21"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s21"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId,
new EndReplaceSegmentsRequest(Arrays.asList("s21"), null));
SegmentLineage segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
@@ -1012,7 +1015,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Add 5 segments
for (int i = 0; i < 5; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).size(), 5);
@@ -1057,9 +1061,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Try after new segments added to the table
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s5"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s5"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s5"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s6"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId1, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds(),
Collections.singleton(lineageEntryId1));
@@ -1080,7 +1086,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t1_0"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t1_0"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t1_0"));
IdealState idealState =
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
assertNotNull(idealState);
assertTrue(idealState.getPartitionSet().contains("merged_t1_0"));
@@ -1112,7 +1119,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t2_0"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t2_0"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t2_0"));
// Without force cleanup, 'startReplaceSegments' again should fail because
of duplicate segments on 'segmentFrom'
List<String> segmentsFrom4 = Arrays.asList("s1", "s2");
@@ -1140,9 +1148,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload segments again
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t3_0"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t3_0"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_0"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t3_1"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"merged_t3_1"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_1"));
// Finish the replacement
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId4, null);
@@ -1181,7 +1191,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s7"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s7"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s7"));
// Start another new segment replacement with empty segmentsFrom, and
check that previous lineages with empty
// segmentsFrom are not reverted
@@ -1196,9 +1207,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Finish the replacement
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s9"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s9"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s9"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s10"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s10"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s10"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId7, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
@@ -1221,7 +1234,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s11"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s11"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s11"));
// Start another new segment replacement with segmentsFrom overlapping
with previous lineage, and check that
// previous lineages with overlapped segmentsFrom are reverted
@@ -1236,9 +1250,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Finish the replacement
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s13"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s13"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s13"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s14"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s14"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s14"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId9, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
@@ -1286,7 +1302,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Add 3 segments
for (int i = 0; i < 3; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
List<String> segmentsForTable =
_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false);
assertEquals(segmentsForTable.size(), 3);
@@ -1306,7 +1323,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Add new segments
for (int i = 3; i < 6; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false), "s0", "s1", "s2", "s3", "s4",
"s5");
@@ -1342,7 +1360,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Add partial segments to indicate incomplete protocol
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s6"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6"));
assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false).size(), 7);
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
true), "s3", "s4", "s5");
@@ -1375,7 +1394,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Add new segments
for (int i = 9; i < 12; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
// Call end segment replacements
@@ -1396,7 +1416,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Re-upload (s9, s10, s11) to test the segment clean up from
startReplaceSegments
for (int i = 9; i < 12; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME,
false), "s3", "s4", "s5", "s9", "s10",
"s11");
@@ -1411,7 +1432,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload the new segments (s12, s13, s14)
for (int i = 12; i < 15; i++) {
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s"
+ i),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
}
// Call endReplaceSegments to start to use (s12, s13, s14)
@@ -1433,7 +1455,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s15"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s15"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s15"));
// Start another new segment replacement with empty segmentsFrom, and
check that previous lineages with empty
// segmentsFrom are not reverted
@@ -1447,9 +1470,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Finish the replacement
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s17"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s17"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s17"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s18"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s18"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s18"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId6, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(),
segmentsFrom6);
@@ -1469,7 +1494,8 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload partial data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s19"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s19"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s19"));
// Start another new segment replacement with segmentsFrom overlapping
with previous lineage, and check that
// previous lineages with overlapped segmentsFrom are reverted
@@ -1483,9 +1509,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Finish the replacement
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s21"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s21"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s22"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s22"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s22"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId8, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
@@ -1506,9 +1534,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Upload data
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s23"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s23"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s23"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s24"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s24"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24"));
// Start another new segment replacement with segmentsTo overlapping with
previous lineage, and check that previous
// lineages with overlapped segmentsTo are reverted
@@ -1526,9 +1556,11 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
// Finish the replacement
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s24"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s24"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24"));
_helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
- SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s25"), "downloadUrl");
+ SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME,
"s25"),
+ getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s25"));
_helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME,
lineageEntryId10, null);
segmentLineage =
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
OFFLINE_TABLE_NAME);
assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsFrom(),
segmentsFrom10);
@@ -1541,6 +1573,10 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
assertNull(segmentLineage);
}
+ private String getDownloadURL(String controllerDataDir, String rawTableName,
String segmentId) {
+ return URIUtils.getUri(controllerDataDir, rawTableName,
URIUtils.encode(segmentId)).toString();
+ }
+
private static void assertSetEquals(Collection<String> actual, String...
expected) {
Set<String> actualSet = actual instanceof Set ? (Set<String>) actual : new
HashSet<>(actual);
assertEquals(actualSet, new HashSet<>(Arrays.asList(expected)));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index e600643934..01cfa1ae76 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -33,11 +33,13 @@ import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
@@ -54,6 +56,7 @@ import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static
org.apache.pinot.common.metadata.ZKMetadataProvider.constructPropertyStorePathForSegment;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
@@ -352,6 +355,76 @@ public class SegmentDeletionManagerTest {
}, 2000L, 10_000L, "Unable to verify table deletion with retention");
}
+
+ @Test
+ public void testSegmentDeletionLogicWithFileWithGZExtension()
+ throws Exception {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY
+ ".class",
+ LocalPinotFS.class.getName());
+ PinotFSFactory.init(new PinotConfiguration(properties));
+
+ HelixAdmin helixAdmin = makeHelixAdmin();
+ ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+ tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
+
+ // create table segment files.
+ Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
+ createTableAndSegmentFilesWithGZExtension(tempDir,
segmentsThatShouldBeDeleted());
+ final File tableDir = new File(tempDir.getAbsolutePath() + File.separator
+ TABLE_NAME);
+ final File deletedTableDir = new File(tempDir.getAbsolutePath() +
File.separator + "Deleted_Segments"
+ + File.separator + TABLE_NAME);
+
+ // mock returning ZK Metadata for segment url
+ ZNRecord znRecord1 = mock(org.apache.helix.ZNRecord.class);
+ ZNRecord znRecord2 = mock(org.apache.helix.ZNRecord.class);
+ ZNRecord znRecord3 = mock(org.apache.helix.ZNRecord.class);
+ List<ZNRecord> znRecordList = List.of(znRecord1, znRecord2, znRecord3);
+ for (int i = 0; i < 3; i++) {
+
when(znRecordList.get(i).getSimpleFields()).thenReturn(Map.of(CommonConstants.Segment.DOWNLOAD_URL,
+ tableDir.getAbsolutePath() + File.separator +
segmentsThatShouldBeDeleted().get(i)
+ + TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
+ when(propertyStore.get(constructPropertyStorePathForSegment(TABLE_NAME,
segmentsThatShouldBeDeleted().get(i)),
+ null, AccessOption.PERSISTENT)).thenReturn(znRecordList.get(i));
+ }
+
+ // delete the segments instantly.
+ SegmentsValidationAndRetentionConfig mockValidationConfig =
mock(SegmentsValidationAndRetentionConfig.class);
+
when(mockValidationConfig.getDeletedSegmentsRetentionPeriod()).thenReturn("0d");
+ TableConfig mockTableConfig = mock(TableConfig.class);
+
when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+ deletionManager.deleteSegments(TABLE_NAME, segments, mockTableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ Assert.assertEquals(tableDir.listFiles().length, 0);
+ Assert.assertTrue(!deletedTableDir.exists() ||
deletedTableDir.listFiles().length == 0);
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+
+ // create table segment files again to test default retention.
+ createTableAndSegmentFilesWithGZExtension(tempDir,
segmentsThatShouldBeDeleted());
+ // delete the segments with default retention
+ deletionManager.deleteSegments(TABLE_NAME, segments);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ Assert.assertEquals(tableDir.listFiles().length, 0);
+ Assert.assertEquals(deletedTableDir.listFiles().length,
segments.size());
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+ }
+
+
public void createTableAndSegmentFiles(File tempDir, List<String> segmentIds)
throws Exception {
File tableDir = new File(tempDir.getAbsolutePath() + File.separator +
TABLE_NAME);
@@ -364,6 +437,20 @@ public class SegmentDeletionManagerTest {
}
}
+ public void createTableAndSegmentFilesWithGZExtension(File tempDir,
List<String> segmentIds)
+ throws Exception {
+ File tableDir = new File(tempDir.getAbsolutePath() + File.separator +
TABLE_NAME);
+ tableDir.mkdir();
+ for (String segmentId : segmentIds) {
+ createTestFileWithAge(
+ tableDir.getAbsolutePath() + File.separator + segmentId +
TarCompressionUtils.TAR_GZ_FILE_EXTENSION, 0);
+ // Create segment metadata file
+ createTestFileWithAge(
+ tableDir.getAbsolutePath() + File.separator + segmentId +
Constants.METADATA_TAR_GZ_FILE_EXT, 0);
+ }
+ }
+
+
public String genDeletedSegmentName(String fileName, int age, int
retentionInDays) {
// adding one more hours to the deletion time just to make sure the test
goes pass the retention period because
// we no longer keep second level info in the date format.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]