This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e8db382d51 [Backfill] allow externally partitioned segment uploads for upsert tables (#13107) e8db382d51 is described below commit e8db382d519aa59310f5788b75f5b1cc13143f55 Author: rohit <roh...@uber.com> AuthorDate: Mon Jun 10 21:48:44 2024 +0530 [Backfill] allow externally partitioned segment uploads for upsert tables (#13107) * [Backfill] allow externally partitioned segment uploads for upsert tables * upload segment with partitionId * revise uploaded realtime segment name convention --- .../apache/pinot/common/utils/SegmentUtils.java | 6 + .../common/utils/UploadedRealtimeSegmentName.java | 181 +++++++++++++++++++++ .../pinot/common/utils/SegmentUtilsTest.java | 60 +++++++ .../utils/UploadedRealtimeSegmentNameTest.java | 58 +++++++ .../batch/common/SegmentGenerationTaskRunner.java | 23 ++- ...oncurrentMapPartitionUpsertMetadataManager.java | 49 +++++- ...rrentMapPartitionUpsertMetadataManagerTest.java | 169 +++++++++++++++++++ .../spi/creator/SegmentGeneratorConfig.java | 21 +++ .../creator/name/SegmentNameGeneratorFactory.java | 1 + .../name/UploadedRealtimeSegmentNameGenerator.java | 91 +++++++++++ .../spi/creator/SegmentGeneratorConfigTest.java | 15 ++ .../UploadedRealtimeSegmentNameGeneratorTest.java | 27 +-- .../spi/ingestion/batch/BatchConfigProperties.java | 1 + 13 files changed, 682 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index aaf44dc441..8b89a2b1a5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -44,6 +44,12 @@ public class SegmentUtils { if (llcSegmentName != null) { return llcSegmentName.getPartitionGroupId(); } + + UploadedRealtimeSegmentName uploadedRealtimeSegmentName = UploadedRealtimeSegmentName.of(segmentName); + if (uploadedRealtimeSegmentName != null) { + return uploadedRealtimeSegmentName.getPartitionId(); + } + // Otherwise, retrieve the partition id from the segment zk metadata. SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java new file mode 100644 index 0000000000..ec2b257b12 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + + +/** + * Class to represent segment names like: {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix} + * + * <p>This naming convention is adopted to represent a segment uploaded to a realtime table. The naming + * convention has been kept semantically similar to {@link LLCSegmentName} but differs in following ways: + * + * <li> prefix to quickly identify the type/source of segment e.g. "uploaded"/"minion" + * <li> name of the table to which the segment belongs + * <li> partitionId which should be consistent as the stream partitioning in case of upsert realtime tables. + * <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z' + * <li> suffix to uniquely identify segments created at the same time. + * + * Use {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} to generate segment names. + */ +public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> { + + private static final String SEPARATOR = "__"; + private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'"; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC(); + private final String _prefix; + private final String _tableName; + private final int _partitionId; + private final String _creationTime; + private final String _segmentName; + private final String _suffix; + + public UploadedRealtimeSegmentName(String segmentName) { + try { + String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); + Preconditions.checkState(parts.length == 5, + "Uploaded segment name must be of the format {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}"); + _prefix = parts[0]; + _tableName = parts[1]; + _partitionId = Integer.parseInt(parts[2]); + _creationTime = parts[3]; + _suffix = parts[4]; + _segmentName = segmentName; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid segment name: " + segmentName, e); + } + } + + /** + * Constructor for UploadedRealtimeSegmentName. + * @param tableName + * @param partitionId + * @param msSinceEpoch + * @param prefix + * @param suffix + */ + public UploadedRealtimeSegmentName(String tableName, int partitionId, long msSinceEpoch, String prefix, + String suffix) { + Preconditions.checkArgument( + StringUtils.isNotBlank(tableName) && !tableName.contains(SEPARATOR) && StringUtils.isNotBlank(prefix) + && !prefix.contains(SEPARATOR) && StringUtils.isNotBlank(suffix) && !suffix.contains(SEPARATOR), + "tableName, prefix and suffix must be non-null, non-empty and not contain '__'"); + _tableName = tableName; + _partitionId = partitionId; + _creationTime = DATE_FORMATTER.print(msSinceEpoch); + _prefix = prefix; + _suffix = suffix; + _segmentName = Joiner.on(SEPARATOR).join(prefix, tableName, partitionId, _creationTime, suffix); + } + + /** + * Checks if the segment name is of the format: {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix} + * @param segmentName + * @return boolean true if the segment name is of the format: {prefix}__{tableName}__{partitionId}__{creationTime} + * __{suffix} + */ + public static boolean isUploadedRealtimeSegmentName(String segmentName) { + int numSeparators = 0; + int index = 0; + while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) { + numSeparators++; + index += 2; // SEPARATOR.length() + } + return numSeparators == 4; + } + + @Nullable + public static UploadedRealtimeSegmentName of(String segmentName) { + try { + return new UploadedRealtimeSegmentName(segmentName); + } catch (Exception e) { + return null; + } + } + + public String getTableName() { + return _tableName; + } + + public int getPartitionId() { + return _partitionId; + } + + /** + * Returns the creation time in the format yyyyMMdd'T'HHmm'Z' + * To be used for only human readability and not for any computation + * @return + */ + public String getCreationTime() { + return _creationTime; + } + + public String getSegmentName() { + return _segmentName; + } + + public String getPrefix() { + return _prefix; + } + + public String getSuffix() { + return _suffix; + } + + /** + * Compares the string representation of the segment name. + * @param other the object to be compared. + * @return + */ + @Override + public int compareTo(UploadedRealtimeSegmentName other) { + Preconditions.checkState(_tableName.equals(other._tableName), + "Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName()); + return _segmentName.compareTo(other._segmentName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof UploadedRealtimeSegmentName)) { + return false; + } + UploadedRealtimeSegmentName that = (UploadedRealtimeSegmentName) o; + return _segmentName.equals(that._segmentName); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentName); + } + + @Override + public String toString() { + return _segmentName; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java index 203cc249d7..1257bc0498 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java @@ -18,14 +18,29 @@ */ package org.apache.pinot.common.utils; +import java.util.HashMap; +import java.util.HashSet; +import org.apache.helix.HelixManager; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.fail; public class SegmentUtilsTest { + private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; private static final String SEGMENT = "testSegment"; + private static final String PARTITION_COLUMN = "partitionColumn"; @Test public void testGetSegmentCreationTimeMs() { @@ -35,4 +50,49 @@ public class SegmentUtilsTest { segmentZKMetadata.setPushTime(2000L); assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 2000L); } + + @Test + public void testGetRealtimeSegmentPartitionIdFromZkMetadata() { + + // mocks + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + SegmentPartitionMetadata segmentPartitionMetadata = mock(SegmentPartitionMetadata.class); + HashMap<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new HashMap<>(); + HashSet<Integer> partitions = new HashSet<>(); + partitions.add(3); + columnPartitionMetadataMap.put(PARTITION_COLUMN, + new ColumnPartitionMetadata("modulo", 8, partitions, new HashMap<>())); + + when(segmentPartitionMetadata.getColumnPartitionMap()).thenReturn(columnPartitionMetadataMap); + when(segmentZKMetadata.getPartitionMetadata()).thenReturn(segmentPartitionMetadata); + + HelixManager helixManager = mock(HelixManager.class); + ZkHelixPropertyStore zkHelixPropertyStore = mock(ZkHelixPropertyStore.class); + when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore); + + // mock static ZKMetadataProvider.getSegmentZKMetadata + try (MockedStatic<ZKMetadataProvider> zkMetadataProviderMockedStatic = Mockito.mockStatic( + ZKMetadataProvider.class)) { + when(ZKMetadataProvider.getSegmentZKMetadata(Mockito.any(ZkHelixPropertyStore.class), eq(TABLE_NAME_WITH_TYPE), + eq(SEGMENT))).thenReturn(segmentZKMetadata); + + Integer partitionId = + SegmentUtils.getRealtimeSegmentPartitionId(SEGMENT, TABLE_NAME_WITH_TYPE, helixManager, PARTITION_COLUMN); + + assertEquals(partitionId, 3); + } + } + + @Test + void testGetRealtimeSegmentPartitionIdForUploadedRealtimeSegment() { + String segmentName = "uploaded__table_name__3__100__1716185755000"; + + try { + Integer partitionId = + SegmentUtils.getRealtimeSegmentPartitionId(segmentName, "realtimeTableName", null, "partitionColumn"); + assertEquals(partitionId, 3); + } catch (Exception e) { + fail("Exception should not be thrown"); + } + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java new file mode 100644 index 0000000000..0cfb8a0196 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class UploadedRealtimeSegmentNameTest { + + @Test + public void testSegmentNameParsing() { + String segmentName = "uploaded__table_name__1__20240530T0000Z__suffix"; + UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new UploadedRealtimeSegmentName(segmentName); + + Assert.assertEquals(uploadedRealtimeSegmentName.getTableName(), "table_name"); + Assert.assertEquals(uploadedRealtimeSegmentName.getPartitionId(), 1); + Assert.assertEquals(uploadedRealtimeSegmentName.getPrefix(), "uploaded"); + Assert.assertEquals(uploadedRealtimeSegmentName.getSuffix(), "suffix"); + Assert.assertEquals(uploadedRealtimeSegmentName.getCreationTime(), "20240530T0000Z"); + } + + @Test + public void testSegmentNameGeneration() { + UploadedRealtimeSegmentName uploadedRealtimeSegmentName = + new UploadedRealtimeSegmentName("tableName", 1, 1717027200000L, "uploaded", "2"); + String expectedSegmentName = "uploaded__tableName__1__20240530T0000Z__2"; + + Assert.assertEquals(uploadedRealtimeSegmentName.getSegmentName(), expectedSegmentName); + } + + @Test + public void testIsUploadedRealtimeSegmentName() { + String validSegmentName = "uploaded__table__0__20220101T0000Z__suffix"; + Assert.assertTrue(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(validSegmentName)); + + String invalidSegmentName = "uploaded__table__0__20220101T0000Z"; + Assert.assertFalse(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(invalidSegmentName)); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java index 8fc421f79e..37ff9bc47c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java @@ -19,6 +19,7 @@ package org.apache.pinot.plugin.ingestion.batch.common; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.pinot.segment.spi.creator.name.InputFileSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; +import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; @@ -159,14 +161,25 @@ public class SegmentGenerationTaskRunner implements Serializable { Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)), IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig), IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), dateTimeFormatSpec, - segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX), - appendUUIDToSegmentName); + segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX), appendUUIDToSegmentName); case BatchConfigProperties.SegmentNameGeneratorType.INPUT_FILE: String inputFileUri = _taskSpec.getCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY); return new InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN), - segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), - inputFileUri, - appendUUIDToSegmentName); + segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), inputFileUri, appendUUIDToSegmentName); + case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME: + Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != null, + "Creation time must be set for uploaded realtime segment name generator"); + Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() != -1, + "Valid partition id must be set for uploaded realtime segment name generator"); + long creationTime; + try { + creationTime = Long.parseLong(segmentGeneratorConfig.getCreationTime()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Creation time must be a valid long value in segmentGeneratorConfig"); + } + return new UploadedRealtimeSegmentNameGenerator(tableName, + segmentGeneratorConfig.getUploadedSegmentPartitionId(), creationTime, + segmentGeneratorConfig.getSegmentNamePrefix(), segmentGeneratorConfig.getSegmentNamePostfix()); default: throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index 5980cbbace..86146b8802 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.UploadedRealtimeSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.LazyRow; import org.apache.pinot.segment.local.utils.HashUtils; @@ -135,10 +136,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp // Update the record location when getting a newer comparison value, or the value is the same as the // current value, but the segment has a larger sequence number (the segment is newer than the current // segment). - if (comparisonResult > 0 || (comparisonResult == 0 && LLCSegmentName.isLLCSegment(segmentName) - && LLCSegmentName.isLLCSegment(currentSegmentName) - && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber( - currentSegmentName))) { + if (comparisonResult > 0 || (comparisonResult == 0 && shouldReplaceOnComparisonTie(segmentName, + currentSegmentName, segment.getSegmentMetadata().getIndexCreationTime(), + currentSegment.getSegmentMetadata().getIndexCreationTime()))) { replaceDocId(segment, validDocIds, queryableDocIds, currentSegment, currentDocId, newDocId, recordInfo); return new RecordLocation(segment, newDocId, newComparisonValue); } else { @@ -158,6 +158,47 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } } + /** + * <li> When the replacing segment and current segment are of {@link LLCSegmentName} then the PK should resolve to + * row in segment with higher sequence id. + * <li> If either or both are not LLC segment, then resolve based on creation time of segment. If creation time is + * same then prefer uploaded segment if other is LLCSegmentName + * <li> If both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to + * current segment. + * + * @param segmentName replacing segment name + * @param currentSegmentName current segment name having the record for the given primary key + * @param segmentCreationTimeMs replacing segment creation time + * @param currentSegmentCreationTimeMs current segment creation time + * @return true if the record in replacing segment should replace the record in current segment + */ + protected boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName, + long segmentCreationTimeMs, long currentSegmentCreationTimeMs) { + + // resolve using sequence id if both are LLCSegmentName + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + LLCSegmentName currentLLCSegmentName = LLCSegmentName.of(currentSegmentName); + if (llcSegmentName != null && currentLLCSegmentName != null) { + return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber(); + } + + // either or both are uploaded segments, prefer the latest segment + int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs); + if (creationTimeComparisonRes != 0) { + return creationTimeComparisonRes > 0; + } + + // if both are uploaded segment, prefer standard UploadedRealtimeSegmentName, if still a tie, then resolve to + // current segment + if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) { + return false; + } + if (UploadedRealtimeSegmentName.of(segmentName) != null) { + return true; + } + return false; + } + @Override protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index d6f3107b74..4270e9547d 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.UploadedRealtimeSegmentName; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; @@ -378,6 +379,143 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true); } + @Test + public void verifyAddReplaceUploadedSegment1() + throws IOException { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(HashFunction.NONE).build()); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; + + // Add the first segment + int numRecords = 6; + int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0}; + int[] timestamps = new int[]{100, 100, 100, 80, 120, 100}; + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, primaryKeys1, segmentMetadata, null); + List<RecordInfo> recordInfoList1; + // get recordInfo by iterating all records. + recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator()); + trackedSegments.add(segment1); + // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + assertEquals(recordLocationMap.size(), 3); + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); + + // Add the second segment of uploaded name format with same creation time + numRecords = 2; + primaryKeys = new int[]{0, 3}; + timestamps = new int[]{100, 80}; + ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment2 = + mockUploadedImmutableSegment("2", validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1000L); + List<RecordInfo> recordInfoList2; + // get recordInfo by iterating all records. + recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, recordInfoList2.iterator()); + trackedSegments.add(uploadedSegment2); + + // segment1: 1 -> {4, 120}, 2 -> {2, 100} + // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); + + // replace uploadedSegment2 + ThreadSafeMutableRoaringBitmap newValidDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl newUploadedSegment2 = + mockUploadedImmutableSegment("2", newValidDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1020L); + upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, null, recordInfoList2.iterator(), + uploadedSegment2); + trackedSegments.add(newUploadedSegment2); + trackedSegments.remove(uploadedSegment2); + + // segment1: 1 -> {4, 120}, 2 -> {2, 100} + // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4}); + assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1}); + + // add upploadedSegment3 with higher creation time than newUploadedSegment2 + numRecords = 1; + primaryKeys = new int[]{0}; + timestamps = new int[]{100}; + ThreadSafeMutableRoaringBitmap validDocIds3 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment3 = + mockUploadedImmutableSegment("3", validDocIds3, null, getPrimaryKeyList(numRecords, primaryKeys), 1040L); + List<RecordInfo> recordInfoList3; + // get recordInfo by iterating all records. + recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, recordInfoList3.iterator()); + + // segment1: 1 -> {4, 120}, 2 -> {2, 100} + // newUploadedSegment2: 3 -> {1, 80} + // uploadedSegment3: 0 -> {0, 100} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4}); + assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new int[]{0}); + + // add uploadedSegment4 with higher creation time than segment 1 and same creation time as uploadedSegment3 + numRecords = 2; + primaryKeys = new int[]{0, 1}; + timestamps = new int[]{100, 120}; + ThreadSafeMutableRoaringBitmap validDocIds4 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment4 = + mockUploadedImmutableSegment("4", validDocIds4, null, getPrimaryKeyList(numRecords, primaryKeys), 1040L); + List<RecordInfo> recordInfoList4; + // get recordInfo by iterating all records. + recordInfoList4 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment4, validDocIds4, null, recordInfoList4.iterator()); + + // segment1: 2 -> {2, 100} + // newUploadedSegment2: 3 -> {1, 80} + // uploadedSegment3: 0 -> {0, 100} + // uploadedSegment4: 1 -> {1, 120} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2}); + assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1}); + assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new int[]{0}); + assertEquals(validDocIds4.getMutableRoaringBitmap().toArray(), new int[]{1}); + + // remove segments + upsertMetadataManager.removeSegment(segment1); + upsertMetadataManager.removeSegment(uploadedSegment2); + upsertMetadataManager.removeSegment(newUploadedSegment2); + upsertMetadataManager.removeSegment(uploadedSegment3); + upsertMetadataManager.removeSegment(uploadedSegment4); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Close the metadata manager + upsertMetadataManager.close(); + } + private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot) throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = @@ -606,6 +744,33 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { when(forwardIndex.getInt(anyInt(), any())).thenAnswer( invocation -> primaryKeys.get(invocation.getArgument(0)).getValues()[0]); when(dataSource.getForwardIndex()).thenReturn(forwardIndex); + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis()); + when(segment.getSegmentMetadata()).thenReturn(segmentMetadata); + return segment; + } + + private static ImmutableSegmentImpl mockUploadedImmutableSegment(String suffix, + ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, + List<PrimaryKey> primaryKeys, Long creationTimeMs) { + if (creationTimeMs == null) { + creationTimeMs = System.currentTimeMillis(); + } + ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class); + when(segment.getSegmentName()).thenReturn(getUploadedRealtimeSegmentName(creationTimeMs, suffix)); + when(segment.getValidDocIds()).thenReturn(validDocIds); + when(segment.getQueryableDocIds()).thenReturn(queryableDocIds); + DataSource dataSource = mock(DataSource.class); + when(segment.getDataSource(anyString())).thenReturn(dataSource); + ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class); + when(forwardIndex.isSingleValue()).thenReturn(true); + when(forwardIndex.getStoredType()).thenReturn(DataType.INT); + when(forwardIndex.getInt(anyInt(), any())).thenAnswer( + invocation -> primaryKeys.get(invocation.getArgument(0)).getValues()[0]); + when(dataSource.getForwardIndex()).thenReturn(forwardIndex); + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs); + when(segment.getSegmentMetadata()).thenReturn(segmentMetadata); return segment; } @@ -656,6 +821,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, System.currentTimeMillis()).toString(); } + private static String getUploadedRealtimeSegmentName(long creationTimeMs, String suffix) { + return new UploadedRealtimeSegmentName(RAW_TABLE_NAME, 0, creationTimeMs, "uploaded", suffix).toString(); + } + private static PrimaryKey makePrimaryKey(int value) { return new PrimaryKey(new Object[]{value}); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java index c879c1be52..0742a814f0 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java @@ -40,6 +40,7 @@ import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; +import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; @@ -107,6 +108,8 @@ public class SegmentGeneratorConfig implements Serializable { private String _creatorVersion = null; private SegmentNameGenerator _segmentNameGenerator = null; private SegmentPartitionConfig _segmentPartitionConfig = null; + + private int _uploadedSegmentPartitionId = -1; private int _sequenceId = -1; private TimeColumnType _timeColumnType = TimeColumnType.EPOCH; private DateTimeFormatSpec _dateTimeFormatSpec = null; @@ -463,6 +466,9 @@ public class SegmentGeneratorConfig implements Serializable { _segmentTimeColumnName = timeColumnName; } + public int getUploadedSegmentPartitionId() { + return _uploadedSegmentPartitionId; + } public int getSequenceId() { return _sequenceId; } @@ -475,6 +481,13 @@ public class SegmentGeneratorConfig implements Serializable { return _fstTypeForFSTIndex; } + /** + * Use this method to add partitionId if it is generated externally during segment upload + */ + public void setUploadedSegmentPartitionId(int partitionId) { + _uploadedSegmentPartitionId = partitionId; + } + /** * This method should be used instead of setPostfix if you are adding a sequence number. */ @@ -581,6 +594,9 @@ public class SegmentGeneratorConfig implements Serializable { IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig), IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig), _dateTimeFormatSpec, _segmentNamePostfix); + case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME: + return new UploadedRealtimeSegmentNameGenerator(_rawTableName, _uploadedSegmentPartitionId, + Long.parseLong(_segmentCreationTime), _segmentNamePrefix, _segmentNamePostfix); default: return new SimpleSegmentNameGenerator(_segmentNamePrefix != null ? _segmentNamePrefix : _rawTableName, _segmentNamePostfix); @@ -600,6 +616,11 @@ public class SegmentGeneratorConfig implements Serializable { return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE; } + // if segment is externally partitioned + if (_uploadedSegmentPartitionId != -1) { + return BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME; + } + return BatchConfigProperties.SegmentNameGeneratorType.SIMPLE; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java index 9fd6c97f35..498079da82 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java @@ -32,6 +32,7 @@ public class SegmentNameGeneratorFactory { public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed"; public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple"; public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizeddate"; + public static final String UPLOADED_REALTIME = "uploadedrealtime"; private SegmentNameGeneratorFactory() { } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java new file mode 100644 index 0000000000..223649734d --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator.name; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; + + +/** + * Implementation for generating segment names of the format UploadedRealtimeSegmentName: + * {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix} + * + * <p> Naming convention to represent uploaded segments to a realtime table see UploadedRealtimeSegmentName. The + * semantic is similar to LLCSegmentName. This naming convention should be preferred when the data is partitioned in + * generated segments and should be assigned based on partitionId to ensure consistency with stream partitioning for + * upsert tables. + */ +public class UploadedRealtimeSegmentNameGenerator implements SegmentNameGenerator { + + private static final String DELIMITER = "__"; + private final String _tableName; + private final int _partitionId; + private final long _creationTimeMillis; + private final String _prefix; + + // if suffix is not set then sequenceId is used as segment name suffix + @Nullable + private final String _suffix; + + /** + * Creates a UploadedRealtimeSegmentNameGenerator + * @param tableName + * @param partitionId + * @param creationTimeMillis + * @param prefix + * @param suffix optional field for generator, if not specified then sequenceId is used as suffix + */ + public UploadedRealtimeSegmentNameGenerator(String tableName, int partitionId, long creationTimeMillis, String prefix, + @Nullable String suffix) { + Preconditions.checkArgument( + StringUtils.isNotBlank(tableName) && !tableName.contains(DELIMITER) && StringUtils.isNotBlank(prefix) + && !prefix.contains(DELIMITER), "Invalid tableName or prefix for UploadedRealtimeSegmentNameGenerator"); + Preconditions.checkArgument(creationTimeMillis > 0, "Creation time must be greater than 0"); + if (suffix != null) { + Preconditions.checkArgument(StringUtils.isNotBlank(suffix) && !suffix.contains(DELIMITER), + "Invalid suffix for UploadedRealtimeSegmentNameGenerator"); + } + _tableName = tableName; + _partitionId = partitionId; + _creationTimeMillis = creationTimeMillis; + _prefix = prefix; + _suffix = suffix; + } + + @Override + public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) { + return Joiner.on(DELIMITER).join(_prefix, _tableName, _partitionId, _creationTimeMillis, + StringUtils.isBlank(_suffix) ? sequenceId : _suffix); + } + + @Override + public String toString() { + StringBuilder stringBuilder = + new StringBuilder("UploadedRealtimeSegmentNameGenerator: tableName=").append(_tableName); + stringBuilder.append(", prefix=").append(_prefix); + stringBuilder.append(", partitionId=").append(_partitionId); + if (_suffix != null) { + stringBuilder.append(", suffix=").append(_suffix); + } + stringBuilder.append(", creationTimeMillis=").append(_creationTimeMillis); + return stringBuilder.toString(); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java index 36a7a0b0dd..39e4ce9c10 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator; +import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; @@ -140,6 +141,20 @@ public class SegmentGeneratorConfigTest { Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator() instanceof SimpleSegmentNameGenerator); Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=testTable")); + // Table config is externally partitioned + tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName("test").build(); + + segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema); + segmentGeneratorConfig.setUploadedSegmentPartitionId(0); + segmentGeneratorConfig.setCreationTime("1234567890"); + segmentGeneratorConfig.setSegmentNamePrefix("prefix"); + segmentGeneratorConfig.setSegmentNamePostfix("5"); + + Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator() instanceof UploadedRealtimeSegmentNameGenerator); + Assert.assertTrue( + segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=test")); + // Table config has no time column defined tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java similarity index 55% copy from pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java copy to pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java index 203cc249d7..71ddd12a83 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java @@ -16,23 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common.utils; +package org.apache.pinot.segment.spi.creator.name; -import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; +import static org.testng.Assert.*; -public class SegmentUtilsTest { - private static final String SEGMENT = "testSegment"; +public class UploadedRealtimeSegmentNameGeneratorTest { @Test - public void testGetSegmentCreationTimeMs() { - SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT); - segmentZKMetadata.setCreationTime(1000L); - assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 1000L); - segmentZKMetadata.setPushTime(2000L); - assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 2000L); + public void testGenerateSegmentName() { + String tableName = "tableName"; + int partitionId = 1; + long creationTimeMillis = 1234567890L; + int sequenceId = 2; + + UploadedRealtimeSegmentNameGenerator generator = + new UploadedRealtimeSegmentNameGenerator(tableName, partitionId, creationTimeMillis, "prefix", "suffix"); + String expectedSegmentName = "prefix__tableName__1__1234567890__suffix"; + + String actualSegmentName = generator.generateSegmentName(sequenceId, null, null); + + assertEquals(actualSegmentName, expectedSegmentName); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index 0782b7a8ec..efb11bc633 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -74,6 +74,7 @@ public class BatchConfigProperties { public static final String SIMPLE = "simple"; public static final String NORMALIZED_DATE = "normalizedDate"; public static final String FIXED = "fixed"; + public static final String UPLOADED_REALTIME = "uploadedRealtime"; public static final String INPUT_FILE = "inputFile"; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org