This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e8db382d51 [Backfill] allow externally partitioned segment uploads for 
upsert tables (#13107)
e8db382d51 is described below

commit e8db382d519aa59310f5788b75f5b1cc13143f55
Author: rohit <roh...@uber.com>
AuthorDate: Mon Jun 10 21:48:44 2024 +0530

    [Backfill] allow externally partitioned segment uploads for upsert tables 
(#13107)
    
    * [Backfill] allow externally partitioned segment uploads for upsert tables
    
    * upload segment with partitionId
    
    * revise uploaded realtime segment name convention
---
 .../apache/pinot/common/utils/SegmentUtils.java    |   6 +
 .../common/utils/UploadedRealtimeSegmentName.java  | 181 +++++++++++++++++++++
 .../pinot/common/utils/SegmentUtilsTest.java       |  60 +++++++
 .../utils/UploadedRealtimeSegmentNameTest.java     |  58 +++++++
 .../batch/common/SegmentGenerationTaskRunner.java  |  23 ++-
 ...oncurrentMapPartitionUpsertMetadataManager.java |  49 +++++-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 169 +++++++++++++++++++
 .../spi/creator/SegmentGeneratorConfig.java        |  21 +++
 .../creator/name/SegmentNameGeneratorFactory.java  |   1 +
 .../name/UploadedRealtimeSegmentNameGenerator.java |  91 +++++++++++
 .../spi/creator/SegmentGeneratorConfigTest.java    |  15 ++
 .../UploadedRealtimeSegmentNameGeneratorTest.java  |  27 +--
 .../spi/ingestion/batch/BatchConfigProperties.java |   1 +
 13 files changed, 682 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index aaf44dc441..8b89a2b1a5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -44,6 +44,12 @@ public class SegmentUtils {
     if (llcSegmentName != null) {
       return llcSegmentName.getPartitionGroupId();
     }
+
+    UploadedRealtimeSegmentName uploadedRealtimeSegmentName = 
UploadedRealtimeSegmentName.of(segmentName);
+    if (uploadedRealtimeSegmentName != null) {
+      return uploadedRealtimeSegmentName.getPartitionId();
+    }
+
     // Otherwise, retrieve the partition id from the segment zk metadata.
     SegmentZKMetadata segmentZKMetadata =
         
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
realtimeTableName, segmentName);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
new file mode 100644
index 0000000000..ec2b257b12
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p>This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ * <li> prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ * <li> name of the table to which the segment belongs
+ * <li> partitionId which should be consistent as the stream partitioning in 
case of upsert realtime tables.
+ * <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z'
+ * <li> suffix to uniquely identify segments created at the same time.
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable<UploadedRealtimeSegmentName> {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+    try {
+      String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+      Preconditions.checkState(parts.length == 5,
+          "Uploaded segment name must be of the format 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+      _prefix = parts[0];
+      _tableName = parts[1];
+      _partitionId = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+      _suffix = parts[4];
+      _segmentName = segmentName;
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+    }
+  }
+
+  /**
+   * Constructor for UploadedRealtimeSegmentName.
+   * @param tableName
+   * @param partitionId
+   * @param msSinceEpoch
+   * @param prefix
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, long 
msSinceEpoch, String prefix,
+      String suffix) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(tableName) && !tableName.contains(SEPARATOR) && 
StringUtils.isNotBlank(prefix)
+            && !prefix.contains(SEPARATOR) && StringUtils.isNotBlank(suffix) 
&& !suffix.contains(SEPARATOR),
+        "tableName, prefix and suffix must be non-null, non-empty and not 
contain '__'");
+    _tableName = tableName;
+    _partitionId = partitionId;
+    _creationTime = DATE_FORMATTER.print(msSinceEpoch);
+    _prefix = prefix;
+    _suffix = suffix;
+    _segmentName = Joiner.on(SEPARATOR).join(prefix, tableName, partitionId, 
_creationTime, suffix);
+  }
+
+  /**
+   * Checks if the segment name is of the format: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+   * @param segmentName
+   * @return boolean true if the segment name is of the format: 
{prefix}__{tableName}__{partitionId}__{creationTime}
+   * __{suffix}
+   */
+  public static boolean isUploadedRealtimeSegmentName(String segmentName) {
+    int numSeparators = 0;
+    int index = 0;
+    while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
+      numSeparators++;
+      index += 2; // SEPARATOR.length()
+    }
+    return numSeparators == 4;
+  }
+
+  @Nullable
+  public static UploadedRealtimeSegmentName of(String segmentName) {
+    try {
+      return new UploadedRealtimeSegmentName(segmentName);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public int getPartitionId() {
+    return _partitionId;
+  }
+
+  /**
+   * Returns the creation time in the format yyyyMMdd'T'HHmm'Z'
+   * To be used for only human readability and not for any computation
+   * @return
+   */
+  public String getCreationTime() {
+    return _creationTime;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public String getPrefix() {
+    return _prefix;
+  }
+
+  public String getSuffix() {
+    return _suffix;
+  }
+
+  /**
+   * Compares the string representation of the segment name.
+   * @param other the object to be compared.
+   * @return
+   */
+  @Override
+  public int compareTo(UploadedRealtimeSegmentName other) {
+    Preconditions.checkState(_tableName.equals(other._tableName),
+        "Cannot compare segment names from different table: %s, %s", 
_segmentName, other.getSegmentName());
+    return _segmentName.compareTo(other._segmentName);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof UploadedRealtimeSegmentName)) {
+      return false;
+    }
+    UploadedRealtimeSegmentName that = (UploadedRealtimeSegmentName) o;
+    return _segmentName.equals(that._segmentName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_segmentName);
+  }
+
+  @Override
+  public String toString() {
+    return _segmentName;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
index 203cc249d7..1257bc0498 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
@@ -18,14 +18,29 @@
  */
 package org.apache.pinot.common.utils;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.fail;
 
 
 public class SegmentUtilsTest {
+  private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT = "testSegment";
+  private static final String PARTITION_COLUMN = "partitionColumn";
 
   @Test
   public void testGetSegmentCreationTimeMs() {
@@ -35,4 +50,49 @@ public class SegmentUtilsTest {
     segmentZKMetadata.setPushTime(2000L);
     assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 
2000L);
   }
+
+  @Test
+  public void testGetRealtimeSegmentPartitionIdFromZkMetadata() {
+
+    // mocks
+    SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+    SegmentPartitionMetadata segmentPartitionMetadata = 
mock(SegmentPartitionMetadata.class);
+    HashMap<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new 
HashMap<>();
+    HashSet<Integer> partitions = new HashSet<>();
+    partitions.add(3);
+    columnPartitionMetadataMap.put(PARTITION_COLUMN,
+        new ColumnPartitionMetadata("modulo", 8, partitions, new HashMap<>()));
+
+    
when(segmentPartitionMetadata.getColumnPartitionMap()).thenReturn(columnPartitionMetadataMap);
+    
when(segmentZKMetadata.getPartitionMetadata()).thenReturn(segmentPartitionMetadata);
+
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore zkHelixPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    
when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+    // mock static ZKMetadataProvider.getSegmentZKMetadata
+    try (MockedStatic<ZKMetadataProvider> zkMetadataProviderMockedStatic = 
Mockito.mockStatic(
+        ZKMetadataProvider.class)) {
+      
when(ZKMetadataProvider.getSegmentZKMetadata(Mockito.any(ZkHelixPropertyStore.class),
 eq(TABLE_NAME_WITH_TYPE),
+          eq(SEGMENT))).thenReturn(segmentZKMetadata);
+
+      Integer partitionId =
+          SegmentUtils.getRealtimeSegmentPartitionId(SEGMENT, 
TABLE_NAME_WITH_TYPE, helixManager, PARTITION_COLUMN);
+
+      assertEquals(partitionId, 3);
+    }
+  }
+
+  @Test
+  void testGetRealtimeSegmentPartitionIdForUploadedRealtimeSegment() {
+    String segmentName = "uploaded__table_name__3__100__1716185755000";
+
+    try {
+      Integer partitionId =
+          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
"realtimeTableName", null, "partitionColumn");
+      assertEquals(partitionId, 3);
+    } catch (Exception e) {
+      fail("Exception should not be thrown");
+    }
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
new file mode 100644
index 0000000000..0cfb8a0196
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class UploadedRealtimeSegmentNameTest {
+
+  @Test
+  public void testSegmentNameParsing() {
+    String segmentName = "uploaded__table_name__1__20240530T0000Z__suffix";
+    UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new 
UploadedRealtimeSegmentName(segmentName);
+
+    Assert.assertEquals(uploadedRealtimeSegmentName.getTableName(), 
"table_name");
+    Assert.assertEquals(uploadedRealtimeSegmentName.getPartitionId(), 1);
+    Assert.assertEquals(uploadedRealtimeSegmentName.getPrefix(), "uploaded");
+    Assert.assertEquals(uploadedRealtimeSegmentName.getSuffix(), "suffix");
+    Assert.assertEquals(uploadedRealtimeSegmentName.getCreationTime(), 
"20240530T0000Z");
+  }
+
+  @Test
+  public void testSegmentNameGeneration() {
+    UploadedRealtimeSegmentName uploadedRealtimeSegmentName =
+        new UploadedRealtimeSegmentName("tableName", 1, 1717027200000L, 
"uploaded", "2");
+    String expectedSegmentName = "uploaded__tableName__1__20240530T0000Z__2";
+
+    Assert.assertEquals(uploadedRealtimeSegmentName.getSegmentName(), 
expectedSegmentName);
+  }
+
+  @Test
+  public void testIsUploadedRealtimeSegmentName() {
+    String validSegmentName = "uploaded__table__0__20220101T0000Z__suffix";
+    
Assert.assertTrue(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(validSegmentName));
+
+    String invalidSegmentName = "uploaded__table__0__20220101T0000Z";
+    
Assert.assertFalse(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(invalidSegmentName));
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 8fc421f79e..37ff9bc47c 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.ingestion.batch.common;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +30,7 @@ import 
org.apache.pinot.segment.spi.creator.name.InputFileSegmentNameGenerator;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -159,14 +161,25 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
             
Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
             IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
             
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), 
dateTimeFormatSpec,
-            segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX),
-            appendUUIDToSegmentName);
+            segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX), 
appendUUIDToSegmentName);
       case BatchConfigProperties.SegmentNameGeneratorType.INPUT_FILE:
         String inputFileUri = 
_taskSpec.getCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY);
         return new 
InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN),
-            segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE),
-            inputFileUri,
-            appendUUIDToSegmentName);
+            segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), 
inputFileUri, appendUUIDToSegmentName);
+      case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+        Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != 
null,
+            "Creation time must be set for uploaded realtime segment name 
generator");
+        
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() 
!= -1,
+            "Valid partition id must be set for uploaded realtime segment name 
generator");
+        long creationTime;
+        try {
+          creationTime = 
Long.parseLong(segmentGeneratorConfig.getCreationTime());
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Creation time must be a valid 
long value in segmentGeneratorConfig");
+        }
+        return new UploadedRealtimeSegmentNameGenerator(tableName,
+            segmentGeneratorConfig.getUploadedSegmentPartitionId(), 
creationTime,
+            segmentGeneratorConfig.getSegmentNamePrefix(), 
segmentGeneratorConfig.getSegmentNamePostfix());
       default:
         throw new UnsupportedOperationException("Unsupported segment name 
generator type: " + segmentNameGeneratorType);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 5980cbbace..86146b8802 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
 import org.apache.pinot.segment.local.utils.HashUtils;
@@ -135,10 +136,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               // Update the record location when getting a newer comparison 
value, or the value is the same as the
               // current value, but the segment has a larger sequence number 
(the segment is newer than the current
               // segment).
-              if (comparisonResult > 0 || (comparisonResult == 0 && 
LLCSegmentName.isLLCSegment(segmentName)
-                  && LLCSegmentName.isLLCSegment(currentSegmentName)
-                  && LLCSegmentName.getSequenceNumber(segmentName) > 
LLCSegmentName.getSequenceNumber(
-                  currentSegmentName))) {
+              if (comparisonResult > 0 || (comparisonResult == 0 && 
shouldReplaceOnComparisonTie(segmentName,
+                  currentSegmentName, 
segment.getSegmentMetadata().getIndexCreationTime(),
+                  
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
                 replaceDocId(segment, validDocIds, queryableDocIds, 
currentSegment, currentDocId, newDocId, recordInfo);
                 return new RecordLocation(segment, newDocId, 
newComparisonValue);
               } else {
@@ -158,6 +158,47 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     }
   }
 
+  /**
+   * <li> When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   * <li> If either or both are not LLC segment, then resolve based on 
creation time of segment. If creation time is
+   * same then prefer uploaded segment if other is LLCSegmentName
+   * <li> If both are uploaded segment, prefer standard 
UploadedRealtimeSegmentName, if still a tie, then resolve to
+   * current segment.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+      long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+    // resolve using sequence id if both are LLCSegmentName
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+    if (llcSegmentName != null && currentLLCSegmentName != null) {
+      return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+    }
+
+    // either or both are uploaded segments, prefer the latest segment
+    int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);
+    if (creationTimeComparisonRes != 0) {
+      return creationTimeComparisonRes > 0;
+    }
+
+    // if both are uploaded segment, prefer standard 
UploadedRealtimeSegmentName, if still a tie, then resolve to
+    // current segment
+    if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
+      return false;
+    }
+    if (UploadedRealtimeSegmentName.of(segmentName) != null) {
+      return true;
+    }
+    return false;
+  }
+
   @Override
   protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index d6f3107b74..4270e9547d 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -378,6 +379,143 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void verifyAddReplaceUploadedSegment1()
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(HashFunction.NONE).build());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo by iterating all records.
+    recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+    // Add the second segment of uploaded name format with same creation time
+    numRecords = 2;
+    primaryKeys = new int[]{0, 3};
+    timestamps = new int[]{100, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment2 =
+        mockUploadedImmutableSegment("2", validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo by iterating all records.
+    recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+    trackedSegments.add(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    // replace uploadedSegment2
+    ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newUploadedSegment2 =
+        mockUploadedImmutableSegment("2", newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+    upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+        uploadedSegment2);
+    trackedSegments.add(newUploadedSegment2);
+    trackedSegments.remove(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
+
+    // add upploadedSegment3 with higher creation time than newUploadedSegment2
+    numRecords = 1;
+    primaryKeys = new int[]{0};
+    timestamps = new int[]{100};
+    ThreadSafeMutableRoaringBitmap validDocIds3 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment3 =
+        mockUploadedImmutableSegment("3", validDocIds3, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+    List<RecordInfo> recordInfoList3;
+    // get recordInfo by iterating all records.
+    recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, 
recordInfoList3.iterator());
+
+    // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+    // newUploadedSegment2: 3 -> {1, 80}
+    // uploadedSegment3: 0 -> {0, 100}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+
+    // add uploadedSegment4 with higher creation time than segment 1 and same 
creation time as uploadedSegment3
+    numRecords = 2;
+    primaryKeys = new int[]{0, 1};
+    timestamps = new int[]{100, 120};
+    ThreadSafeMutableRoaringBitmap validDocIds4 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment4 =
+        mockUploadedImmutableSegment("4", validDocIds4, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+    List<RecordInfo> recordInfoList4;
+    // get recordInfo by iterating all records.
+    recordInfoList4 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment4, validDocIds4, null, 
recordInfoList4.iterator());
+
+    // segment1: 2 -> {2, 100}
+    // newUploadedSegment2: 3 -> {1, 80}
+    // uploadedSegment3: 0 -> {0, 100}
+    // uploadedSegment4: 1 -> {1, 120}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2});
+    assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+    assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
+    assertEquals(validDocIds4.getMutableRoaringBitmap().toArray(), new 
int[]{1});
+
+    // remove segments
+    upsertMetadataManager.removeSegment(segment1);
+    upsertMetadataManager.removeSegment(uploadedSegment2);
+    upsertMetadataManager.removeSegment(newUploadedSegment2);
+    upsertMetadataManager.removeSegment(uploadedSegment3);
+    upsertMetadataManager.removeSegment(uploadedSegment4);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
   private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction 
hashFunction, boolean enableSnapshot)
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -606,6 +744,33 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
         invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    return segment;
+  }
+
+  private static ImmutableSegmentImpl mockUploadedImmutableSegment(String 
suffix,
+      ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
+      List<PrimaryKey> primaryKeys, Long creationTimeMs) {
+    if (creationTimeMs == null) {
+      creationTimeMs = System.currentTimeMillis();
+    }
+    ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+    
when(segment.getSegmentName()).thenReturn(getUploadedRealtimeSegmentName(creationTimeMs,
 suffix));
+    when(segment.getValidDocIds()).thenReturn(validDocIds);
+    when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+    DataSource dataSource = mock(DataSource.class);
+    when(segment.getDataSource(anyString())).thenReturn(dataSource);
+    ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+    when(forwardIndex.isSingleValue()).thenReturn(true);
+    when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
+    when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+        invocation -> 
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+    when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
 
@@ -656,6 +821,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber, 
System.currentTimeMillis()).toString();
   }
 
+  private static String getUploadedRealtimeSegmentName(long creationTimeMs, 
String suffix) {
+    return new UploadedRealtimeSegmentName(RAW_TABLE_NAME, 0, creationTimeMs, 
"uploaded", suffix).toString();
+  }
+
   private static PrimaryKey makePrimaryKey(int value) {
     return new PrimaryKey(new Object[]{value});
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index c879c1be52..0742a814f0 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -40,6 +40,7 @@ import 
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -107,6 +108,8 @@ public class SegmentGeneratorConfig implements Serializable 
{
   private String _creatorVersion = null;
   private SegmentNameGenerator _segmentNameGenerator = null;
   private SegmentPartitionConfig _segmentPartitionConfig = null;
+
+  private int _uploadedSegmentPartitionId = -1;
   private int _sequenceId = -1;
   private TimeColumnType _timeColumnType = TimeColumnType.EPOCH;
   private DateTimeFormatSpec _dateTimeFormatSpec = null;
@@ -463,6 +466,9 @@ public class SegmentGeneratorConfig implements Serializable 
{
     _segmentTimeColumnName = timeColumnName;
   }
 
+  public int getUploadedSegmentPartitionId() {
+    return _uploadedSegmentPartitionId;
+  }
   public int getSequenceId() {
     return _sequenceId;
   }
@@ -475,6 +481,13 @@ public class SegmentGeneratorConfig implements 
Serializable {
     return _fstTypeForFSTIndex;
   }
 
+  /**
+   * Use this method to add partitionId if it is generated externally during 
segment upload
+   */
+  public void setUploadedSegmentPartitionId(int partitionId) {
+    _uploadedSegmentPartitionId = partitionId;
+  }
+
   /**
    * This method should be used instead of setPostfix if you are adding a 
sequence number.
    */
@@ -581,6 +594,9 @@ public class SegmentGeneratorConfig implements Serializable 
{
             IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
             
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig), 
_dateTimeFormatSpec,
             _segmentNamePostfix);
+      case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+        return new UploadedRealtimeSegmentNameGenerator(_rawTableName, 
_uploadedSegmentPartitionId,
+            Long.parseLong(_segmentCreationTime), _segmentNamePrefix, 
_segmentNamePostfix);
       default:
         return new SimpleSegmentNameGenerator(_segmentNamePrefix != null ? 
_segmentNamePrefix : _rawTableName,
             _segmentNamePostfix);
@@ -600,6 +616,11 @@ public class SegmentGeneratorConfig implements 
Serializable {
       return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE;
     }
 
+    // if segment is externally partitioned
+    if (_uploadedSegmentPartitionId != -1) {
+      return BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME;
+    }
+
     return BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
index 9fd6c97f35..498079da82 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
@@ -32,6 +32,7 @@ public class SegmentNameGeneratorFactory {
   public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed";
   public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
   public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = 
"normalizeddate";
+  public static final String UPLOADED_REALTIME = "uploadedrealtime";
 
   private SegmentNameGeneratorFactory() {
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
new file mode 100644
index 0000000000..223649734d
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p> Naming convention to represent uploaded segments to a realtime table 
see UploadedRealtimeSegmentName. The
+ * semantic is similar to LLCSegmentName. This naming convention should be 
preferred when the data is partitioned in
+ * generated segments and should be assigned based on partitionId to ensure 
consistency with stream partitioning for
+ * upsert tables.
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  private final long _creationTimeMillis;
+  private final String _prefix;
+
+  // if suffix is not set then sequenceId is used as segment name suffix
+  @Nullable
+  private final String _suffix;
+
+  /**
+   * Creates a UploadedRealtimeSegmentNameGenerator
+   * @param tableName
+   * @param partitionId
+   * @param creationTimeMillis
+   * @param prefix
+   * @param suffix optional field for generator, if not specified then 
sequenceId is used as suffix
+   */
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis, String prefix,
+      @Nullable String suffix) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(tableName) && !tableName.contains(DELIMITER) && 
StringUtils.isNotBlank(prefix)
+            && !prefix.contains(DELIMITER), "Invalid tableName or prefix for 
UploadedRealtimeSegmentNameGenerator");
+    Preconditions.checkArgument(creationTimeMillis > 0, "Creation time must be 
greater than 0");
+    if (suffix != null) {
+      Preconditions.checkArgument(StringUtils.isNotBlank(suffix) && 
!suffix.contains(DELIMITER),
+          "Invalid suffix for UploadedRealtimeSegmentNameGenerator");
+    }
+    _tableName = tableName;
+    _partitionId = partitionId;
+    _creationTimeMillis = creationTimeMillis;
+    _prefix = prefix;
+    _suffix = suffix;
+  }
+
+  @Override
+  public String generateSegmentName(int sequenceId, @Nullable Object 
minTimeValue, @Nullable Object maxTimeValue) {
+    return Joiner.on(DELIMITER).join(_prefix, _tableName, _partitionId, 
_creationTimeMillis,
+        StringUtils.isBlank(_suffix) ? sequenceId : _suffix);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder =
+        new StringBuilder("UploadedRealtimeSegmentNameGenerator: 
tableName=").append(_tableName);
+    stringBuilder.append(", prefix=").append(_prefix);
+    stringBuilder.append(", partitionId=").append(_partitionId);
+    if (_suffix != null) {
+      stringBuilder.append(", suffix=").append(_suffix);
+    }
+    stringBuilder.append(", creationTimeMillis=").append(_creationTimeMillis);
+    return stringBuilder.toString();
+  }
+}
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
index 36a7a0b0dd..39e4ce9c10 100644
--- 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -140,6 +141,20 @@ public class SegmentGeneratorConfigTest {
     Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator() 
instanceof SimpleSegmentNameGenerator);
     
Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=testTable"));
 
+    // Table config is externally partitioned
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("test").build();
+
+    segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setUploadedSegmentPartitionId(0);
+    segmentGeneratorConfig.setCreationTime("1234567890");
+    segmentGeneratorConfig.setSegmentNamePrefix("prefix");
+    segmentGeneratorConfig.setSegmentNamePostfix("5");
+
+    Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator() 
instanceof UploadedRealtimeSegmentNameGenerator);
+    Assert.assertTrue(
+        
segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=test"));
+
     // Table config has no time column defined
     tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
similarity index 55%
copy from 
pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
copy to 
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
index 203cc249d7..71ddd12a83 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
@@ -16,23 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.utils;
+package org.apache.pinot.segment.spi.creator.name;
 
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
 
 
-public class SegmentUtilsTest {
-  private static final String SEGMENT = "testSegment";
+public class UploadedRealtimeSegmentNameGeneratorTest {
 
   @Test
-  public void testGetSegmentCreationTimeMs() {
-    SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT);
-    segmentZKMetadata.setCreationTime(1000L);
-    assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 
1000L);
-    segmentZKMetadata.setPushTime(2000L);
-    assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 
2000L);
+  public void testGenerateSegmentName() {
+    String tableName = "tableName";
+    int partitionId = 1;
+    long creationTimeMillis = 1234567890L;
+    int sequenceId = 2;
+
+    UploadedRealtimeSegmentNameGenerator generator =
+        new UploadedRealtimeSegmentNameGenerator(tableName, partitionId, 
creationTimeMillis, "prefix", "suffix");
+    String expectedSegmentName = "prefix__tableName__1__1234567890__suffix";
+
+    String actualSegmentName = generator.generateSegmentName(sequenceId, null, 
null);
+
+    assertEquals(actualSegmentName, expectedSegmentName);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 0782b7a8ec..efb11bc633 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -74,6 +74,7 @@ public class BatchConfigProperties {
     public static final String SIMPLE = "simple";
     public static final String NORMALIZED_DATE = "normalizedDate";
     public static final String FIXED = "fixed";
+    public static final String UPLOADED_REALTIME = "uploadedRealtime";
     public static final String INPUT_FILE = "inputFile";
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to