Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-10 Thread via GitHub


klsince merged PR #13107:
URL: https://github.com/apache/pinot/pull/13107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-10 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1633513536


##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##
@@ -600,6 +616,11 @@ public String inferSegmentNameGeneratorType() {
   return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE;
 }
 
+// if segment is externally partitioned
+if (_uploadedSegmentPartitionId != -1) {

Review Comment:
   my bad, I meant to be reattime table, but anyway not blocking for this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-10 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1632638123


##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##
@@ -600,6 +616,11 @@ public String inferSegmentNameGeneratorType() {
   return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE;
 }
 
+// if segment is externally partitioned
+if (_uploadedSegmentPartitionId != -1) {

Review Comment:
   Partitioning is also applicable for non-upsert realtime table(for query 
routing). This check would restrict to upsert only tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-08 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1631741645


##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * This naming convention is adopted to represent uploaded segments to a 
realtime table see
+ * UploadedRealtimeSegmentName. The semantic is similar
+ * to LLCSegmentName. This naming convention should be preferred when the data 
is partitioned in offline generated
+ * segments and should be assigned based same as realtime segments of the 
table generated from stream.
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  // creation time must be in long and milliseconds since epoch to be 
consistent with creation.meta time for valid
+  // comparison in segment replace flow.
+  private final long _creationTimeMillis;
+  private final String _prefix;
+
+  // if suffix is not set then sequenceId is used as segment name suffix
+  @Nullable
+  private final String _suffix;
+
+  /**
+   * Creates a UploadedRealtimeSegmentNameGenerator
+   * @param tableName
+   * @param partitionId
+   * @param creationTimeMillis
+   * @param prefix
+   * @param suffix optional field for generator, if not specified then 
sequenceId is used as suffix
+   */
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis, String prefix,
+  @Nullable String suffix) {
+Preconditions.checkArgument(
+StringUtils.isNotBlank(tableName) && !tableName.contains(DELIMITER) && 
StringUtils.isNotBlank(prefix)
+&& !prefix.contains(DELIMITER), "Invalid tableName or prefix for 
UploadedRealtimeSegmentNameGenerator");
+Preconditions.checkArgument(creationTimeMillis > 0, "Creation time must be 
greater than 0");
+if (suffix != null) {
+  Preconditions.checkArgument(suffix.length() > 0 && 
!suffix.contains(DELIMITER),

Review Comment:
   nit: `StringUtils.isNotBlank(suffix)`



##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##
@@ -600,6 +616,11 @@ public String inferSegmentNameGeneratorType() {
   return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE;
 }
 
+// if segment is externally partitioned
+if (_uploadedSegmentPartitionId != -1) {

Review Comment:
   may also want to check if the table is an upsert table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-07 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1631688317


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,47 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  If either or both are not LLC segment, then resolve based on 
creation time of segment. If creation time is
+   * same then prefer uploaded segment if other is LLCSegmentName
+   *  If both are uploaded segment, prefer standard 
UploadedRealtimeSegmentName, if still a tie, then resolve to
+   * current segment.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+// resolve using sequence id if both are LLCSegmentName
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+// either or both are uploaded segments, prefer the latest segment
+int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);
+if ((llcSegmentName == null || currentLLCSegmentName == null) && 
creationTimeComparisonRes != 0) {

Review Comment:
   This is true, tried to be extra carefull and this was the side effect,
   
   Removed this and tested with UTs. The behaviour is as expected. PTAL at the 
UTs as well. Updating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-07 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1631496448


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ *  prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ *  tableName to be same as the table name of segment
+ *  partitionId to identify the right parition for upsert table segment 
table assignment.
+ *  creationTime creation time of segment of the format MMdd'T'HHmm'Z'
+ *  suffix to deduplicate segment names created at the same time
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "MMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+// split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+// the end and validate segment name starts with prefix uploaded_

Review Comment:
   this comment might need update as well



##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,47 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  If either or both are not LLC segment, then resolve based on 
creation time of segment. If creation time is
+   * same then prefer uploaded segment if other is LLCSegmentName
+   *  If both are uploaded segment, prefer standard 
UploadedRealtimeSegmentName, if still a tie, then resolve to
+   * current segment.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+// resolve using sequence id if both are LLCSegmentName
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+// either or both are uploaded segments, prefer the latest segment
+int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);
+if ((llcSegmentName == null || 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-07 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1630674736


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,45 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher creation time
+   *  For other cases resolve based on creation time of segment. In case 
the creation time is same, give
+   * preference to an uploaded segment. A segment which is not LLCSegment can 
be assumed to be uploaded segment and
+   * is given preference.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);

Review Comment:
   Thanks for simplifying this
   
   > // favor the first writer
   > return false;
   
   This may be an inconsistent behaviour(there can be two writers and may not 
always sync and write) but I don't have a strong inclination. It will be good 
to keep it as you suggested as it does not modify the existing behaviour 
   
   For later, we can always depend on a critera based on segment metadata/name 
to be consistent. e.g. we can check based on lexical order of segment name 
which can enforce to use right naming conventions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-06 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1630674736


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,45 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher creation time
+   *  For other cases resolve based on creation time of segment. In case 
the creation time is same, give
+   * preference to an uploaded segment. A segment which is not LLCSegment can 
be assumed to be uploaded segment and
+   * is given preference.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);

Review Comment:
   > // favor the first writer
   > return false;
   
   This may be an inconsistent behaviour(there can be two writers and may not 
always sync and write) but I don't have a strong inclination. It will be good 
to keep it as you suggested as it does not modify the existing behaviour 
   
   For later, we can always depend on a critera based on segment metadata/name 
to be consistent. e.g. we can check based on lexical order of segment name 
which can enforce to use right naming conventions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-06 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1630664998


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ *  prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ *  tableName to be same as the table name of segment
+ *  partitionId to identify the right parition for upsert table segment 
table assignment.
+ *  creationTime creation time of segment of the format MMdd'T'HHmm'Z'
+ *  suffix to deduplicate segment names created at the same time
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "MMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+// split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+// the end and validate segment name starts with prefix uploaded_
+try {
+  String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+  Preconditions.checkState(parts.length == 5,
+  "Uploaded segment name must be of the format 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+  _prefix = parts[0];
+  _tableName = parts[1];
+  _partitionId = Integer.parseInt(parts[2]);
+  _creationTime = parts[3];
+  _suffix = parts[4];
+  _segmentName = segmentName;
+} catch (NumberFormatException e) {
+  throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+}
+  }
+
+  /**
+   * Constructor for UploadedRealtimeSegmentName.
+   * @param tableName
+   * @param partitionId
+   * @param msSinceEpoch
+   * @param prefix
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, long 
msSinceEpoch, String prefix,
+  String suffix) {
+Preconditions.checkArgument(
+StringUtils.isNotBlank(tableName) && StringUtils.isNotBlank(prefix) && 
StringUtils.isNotBlank(suffix),

Review Comment:
   Ack, will also add this to UploadedRealtimeSegmentNameGenerator as a check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-06 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1630365372


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ *  prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ *  tableName to be same as the table name of segment
+ *  partitionId to identify the right parition for upsert table segment 
table assignment.
+ *  creationTime creation time of segment of the format MMdd'T'HHmm'Z'
+ *  suffix to deduplicate segment names created at the same time
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "MMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+// split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+// the end and validate segment name starts with prefix uploaded_
+try {
+  String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+  Preconditions.checkState(parts.length == 5,
+  "Uploaded segment name must be of the format 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+  _prefix = parts[0];
+  _tableName = parts[1];
+  _partitionId = Integer.parseInt(parts[2]);
+  _creationTime = parts[3];
+  _suffix = parts[4];
+  _segmentName = segmentName;
+} catch (NumberFormatException e) {
+  throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+}
+  }
+
+  /**
+   * Constructor for UploadedRealtimeSegmentName.
+   * @param tableName
+   * @param partitionId
+   * @param msSinceEpoch
+   * @param prefix
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, long 
msSinceEpoch, String prefix,
+  String suffix) {
+Preconditions.checkArgument(
+StringUtils.isNotBlank(tableName) && StringUtils.isNotBlank(prefix) && 
StringUtils.isNotBlank(suffix),

Review Comment:
   as in LLCSegmentName, we can also check tableName/suffix/prefix doesn't 
contains(SEPARATOR)



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-05 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1628157120


##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}
+ *
+ * This naming convention is adopted to represent uploaded segments to a 
realtime table. The semantic is similar
+ * to LLCSegmentName. Scenarios where this naming convention can be preferred 
is:
+ *  Generating segments from a batch workload
+ *  Minion based segment transformations
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String SEGMENT_NAME_PREFIX = "uploaded";
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  // creation time must be in long and milliseconds since epoch to be 
consistent with creation.meta time for valid
+  // comparison in segment replace flow.
+  private final long _creationTimeMillis;
+  @Nullable
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis,
+  String suffix) {
+Preconditions.checkState(creationTimeMillis > 0, "Creation time must be 
positive");

Review Comment:
   bump ^ as this might be missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-04 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1626373676


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,49 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher creation time followed by 
sequence id.
+   *  For other cases resolve based on creation time of segment. In case 
the creation time is same, give
+   * preference to an uplaoded segment. A segment which is not LLCSegment can 
be assumed to be uploaded segment and

Review Comment:
   typo: uplaoded



##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,49 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher creation time followed by 
sequence id.
+   *  For other cases resolve based on creation time of segment. In case 
the creation time is same, give
+   * preference to an uplaoded segment. A segment which is not LLCSegment can 
be assumed to be uploaded segment and
+   * is given preference.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  private boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);
+
+UploadedRealtimeSegmentName uploadedSegmentName = 
UploadedRealtimeSegmentName.of(segmentName);
+UploadedRealtimeSegmentName currentUploadedSegmentName = 
UploadedRealtimeSegmentName.of(currentSegmentName);
+if (uploadedSegmentName != null && currentUploadedSegmentName != null) {
+  if (creationTimeComparisonRes == 0) {
+return uploadedSegmentName.getSequenceId() > 
currentUploadedSegmentName.getSequenceId();
+  } else {
+return creationTimeComparisonRes > 0;
+  }
+}
+
+if (creationTimeComparisonRes == 0) {
+  return llcSegmentName == null || uploadedSegmentName != null;

Review Comment:
   nit: comment that `uploadedSegmentName != null` is to favor the segment with 
formatted name



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-04 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625655826


##
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##
@@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete()
 verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void verifyAddReplaceUploadedSegment()
+  throws IOException {
+ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+_contextBuilder.setHashFunction(HashFunction.NONE).build());
+Map recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+Set trackedSegments = upsertMetadataManager._trackedSegments;
+
+// Add the first segment
+int numRecords = 6;
+int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+List primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+ImmutableSegmentImpl segment1 =
+mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+List recordInfoList1;
+// get recordInfo by iterating all records.
+recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+trackedSegments.add(segment1);
+// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+assertEquals(recordLocationMap.size(), 3);
+checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+// Add the second segment of uploaded name format with higher creation time
+numRecords = 5;
+primaryKeys = new int[]{0, 1, 2, 3, 0};
+timestamps = new int[]{100, 100, 120, 80, 80};
+ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+ImmutableSegmentImpl uploadedSegment2 =
+mockUploadedImmutableSegment(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1010L);
+List recordInfoList2;
+// get recordInfo by iterating all records.
+recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+trackedSegments.add(uploadedSegment2);
+
+// segment1: 1 -> {4, 120}
+// uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+assertEquals(recordLocationMap.size(), 4);
+checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+// replace uploadedSegment2 with higher creation time
+ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+ImmutableSegmentImpl newUploadedSegment2 =
+mockUploadedImmutableSegment(2, newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+uploadedSegment2);
+trackedSegments.add(newUploadedSegment2);
+trackedSegments.remove(uploadedSegment2);
+
+// segment1: 1 -> {4, 120}
+// newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+assertEquals(recordLocationMap.size(), 4);
+checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-04 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625655826


##
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##
@@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete()
 verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void verifyAddReplaceUploadedSegment()
+  throws IOException {
+ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+_contextBuilder.setHashFunction(HashFunction.NONE).build());
+Map recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+Set trackedSegments = upsertMetadataManager._trackedSegments;
+
+// Add the first segment
+int numRecords = 6;
+int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+List primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+ImmutableSegmentImpl segment1 =
+mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+List recordInfoList1;
+// get recordInfo by iterating all records.
+recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+trackedSegments.add(segment1);
+// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+assertEquals(recordLocationMap.size(), 3);
+checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+// Add the second segment of uploaded name format with higher creation time
+numRecords = 5;
+primaryKeys = new int[]{0, 1, 2, 3, 0};
+timestamps = new int[]{100, 100, 120, 80, 80};
+ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+ImmutableSegmentImpl uploadedSegment2 =
+mockUploadedImmutableSegment(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1010L);
+List recordInfoList2;
+// get recordInfo by iterating all records.
+recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+trackedSegments.add(uploadedSegment2);
+
+// segment1: 1 -> {4, 120}
+// uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+assertEquals(recordLocationMap.size(), 4);
+checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+// replace uploadedSegment2 with higher creation time
+ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+ImmutableSegmentImpl newUploadedSegment2 =
+mockUploadedImmutableSegment(2, newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+uploadedSegment2);
+trackedSegments.add(newUploadedSegment2);
+trackedSegments.remove(uploadedSegment2);
+
+// segment1: 1 -> {4, 120}
+// newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+assertEquals(recordLocationMap.size(), 4);
+checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, 
HashFunction.NONE);
+checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, 
HashFunction.NONE);
+assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-04 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625400329


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,51 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *

Review Comment:
   > if both Uploaded, compare ctime, then seqId (btw, I refined this point 
based on one unit test in this PR. I commented on that unit test as well.)
   
   This makes sense, thinking from the backfill 
perspective(corrective/bootstrap backfill). Batch jobs run at a time T and the 
segments for that time T will be generated sequentially.
   
   The naming convention java docs can call this out. Let me update the UTs 
accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625104449


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,51 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *

Review Comment:
   sgtm, basically:
   1. if both LLC, compare seqId
   2. if both Uploaded, compare ctime, then seqId (btw, I refined this point 
based on one unit test in this PR. I commented on that unit test as well.)
   3. otherwise, simply check ctimes. If ctimes are same, favor uploaded segment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625083242


##
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}
+ *
+ * This naming convention is adopted to represent uploaded segments to a 
realtime table. The semantic is similar
+ * to LLCSegmentName. Scenarios where this naming convention can be preferred 
is:
+ *  Generating segments from a batch workload
+ *  Minion based segment transformations
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String SEGMENT_NAME_PREFIX = "uploaded";
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  // creation time must be in long and milliseconds since epoch to be 
consistent with creation.meta time for valid
+  // comparison in segment replace flow.
+  private final long _creationTimeMillis;
+  @Nullable
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis,
+  String suffix) {
+Preconditions.checkState(creationTimeMillis > 0, "Creation time must be 
positive");

Review Comment:
   nit: use checkArgument() as those are params passed in
   
   and also check on the partitionid value to be >=0?



##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,52 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  private boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+if (llcSegmentName != null && currentLLCSegmentName != null) {
+  return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+}
+
+UploadedRealtimeSegmentName uploadedSegmentName = 
UploadedRealtimeSegmentName.of(segmentName);
+UploadedRealtimeSegmentName currentUploadedSegmentName = 
UploadedRealtimeSegmentName.of(currentSegmentName);
+
+if (uploadedSegmentName != null && currentUploadedSegmentName != null) {
+  int comparisonResult =
+  Integer.compare(uploadedSegmentName.getSequenceId(), 
currentUploadedSegmentName.getSequenceId());
+  if (comparisonResult == 0) {
+Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);

Review Comment:
   missed `return `? 
   
   btw, can inline into `return res == 0? Long.compare(segmentCreationTimeMs, 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624940078


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{
+ * optionalSuffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept similar to {@LLCSegmentName} to but differentiates 
between stream generated LLCSegments
+ * based on the prefix "uploaded" and an optional suffix.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {

Review Comment:
   Agreed, the sequence numbers are externally generated and may not have same 
meaning. Updating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624940078


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{
+ * optionalSuffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept similar to {@LLCSegmentName} to but differentiates 
between stream generated LLCSegments
+ * based on the prefix "uploaded" and an optional suffix.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {

Review Comment:
   Agreed, the sequence numbers are externally generated and may not have same 
meaning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624938440


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,51 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *

Review Comment:
   afaik, we can create segments using `SegmentIndexCreationDriverImpl` and 
creationTime is always populated. The segment name format may be unknown but 
the segment is still uploaded and can be for say, data correction/addition.
   
   So we can rely on creation time as first comparison criterion followed by 
below criteria:
   LLC, Unknown: creation time check or prefer unknown as this is an uploaded 
segment too (deviates from current behaviour)
   Uploaded, Unknown: creation time check or prefer known name format
   Unknown, Unknown: creation time check or false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624938073


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{
+ * optionalSuffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept similar to {@LLCSegmentName} to but differentiates 
between stream generated LLCSegments
+ * based on the prefix "uploaded" and an optional suffix.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  private static final String UPLOADED_PREFIX = "uploaded";
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "MMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _tableName;
+  private final int _partitionId;
+  private final int _sequenceId;
+  private final String _creationTime;
+  private final String _segmentName;
+
+  @Nullable
+  private String _suffix = null;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+// split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+// the end and validate segment name starts with prefix uploaded_
+try {
+  String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+  Preconditions.checkState((parts.length == 5 || parts.length == 6) && 
parts[0].equals(UPLOADED_PREFIX),
+  "Uploaded segment name must be of the format "
+  + 
"uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}");
+  int idx = parts.length - 1;
+  if (parts.length == 6) {
+_suffix = parts[idx--];
+  }
+  _creationTime = parts[idx--];
+  _sequenceId = Integer.parseInt(parts[idx--]);
+  _partitionId = Integer.parseInt(parts[idx]);
+  _tableName = Joiner.on(SEPARATOR).join(Arrays.copyOfRange(parts, 1, 
idx));
+  _segmentName = segmentName;
+} catch (NumberFormatException e) {
+  throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+}
+  }
+
+  /**
+   * Constructor to create a segment name from the table name, partition id, 
sequence id, creation time and optional
+   * suffix
+   * @param tableName
+   * @param partitionId
+   * @param sequenceId
+   * @param msSinceEpoch
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, int 
sequenceId, long msSinceEpoch,
+  String suffix) {
+_tableName = tableName;
+_partitionId = partitionId;
+_sequenceId = sequenceId;
+_creationTime = DATE_FORMATTER.print(msSinceEpoch);
+_suffix = suffix;
+_segmentName = Joiner.on(SEPARATOR).skipNulls()
+.join(UPLOADED_PREFIX, tableName, partitionId, sequenceId, 
_creationTime, suffix);
+  }
+
+  public static boolean isUploadedRealtimeSegmentName(String segmentName) {
+String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+if (!(parts.length == 5 || parts.length == 6)) {
+  return false;
+}
+if (!parts[0].equals(UPLOADED_PREFIX)) {
+  return false;
+}
+
+int idx = parts.length == 5 ? parts.length - 1 : parts.length - 2;
+// validate creation time is of format MMdd'T'HHmm'Z'
+try {
+  DATE_FORMATTER.parseDateTime(parts[idx--]);
+} catch (IllegalArgumentException e) {
+  return false;
+}
+
+// return false if sequenceId and partitionId are not int
+try {
+  Integer.parseInt(parts[idx]);
+   

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624820901


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{
+ * optionalSuffix}
+ *
+ * This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept similar to {@LLCSegmentName} to but differentiates 
between stream generated LLCSegments
+ * based on the prefix "uploaded" and an optional suffix.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  private static final String UPLOADED_PREFIX = "uploaded";
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "MMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _tableName;
+  private final int _partitionId;
+  private final int _sequenceId;
+  private final String _creationTime;
+  private final String _segmentName;
+
+  @Nullable
+  private String _suffix = null;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+// split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+// the end and validate segment name starts with prefix uploaded_
+try {
+  String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+  Preconditions.checkState((parts.length == 5 || parts.length == 6) && 
parts[0].equals(UPLOADED_PREFIX),
+  "Uploaded segment name must be of the format "
+  + 
"uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}");
+  int idx = parts.length - 1;
+  if (parts.length == 6) {
+_suffix = parts[idx--];
+  }
+  _creationTime = parts[idx--];
+  _sequenceId = Integer.parseInt(parts[idx--]);
+  _partitionId = Integer.parseInt(parts[idx]);
+  _tableName = Joiner.on(SEPARATOR).join(Arrays.copyOfRange(parts, 1, 
idx));
+  _segmentName = segmentName;
+} catch (NumberFormatException e) {
+  throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+}
+  }
+
+  /**
+   * Constructor to create a segment name from the table name, partition id, 
sequence id, creation time and optional
+   * suffix
+   * @param tableName
+   * @param partitionId
+   * @param sequenceId
+   * @param msSinceEpoch
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, int 
sequenceId, long msSinceEpoch,
+  String suffix) {

Review Comment:
   `@nullable` on suffix



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624238350


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}

Review Comment:
   Offline discussion: The final semgent name convention has been decided as 
`uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}`
   This ensures, similar naming as LLCSegmentName, removes need to do ZK lookup 
in various critical paths, uploaded prefix enforces the type of segment and an 
optional suffix can be used to encode additional info, e.g. source of 
segment(spark, flink, minion, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-06-03 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1624238350


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}

Review Comment:
   The final semgent name convention has been decided as 
`uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}`
   This ensures, similar naming as LLCSegmentName, removes need to do ZK lookup 
in various critical paths, uploaded prefix enforces the type of segment and an 
optional suffix can be used to encode additional info, e.g. source of 
segment(spark, flink, minion, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-31 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1622835600


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}

Review Comment:
   The reason to enforce prefix was to quickly identify an uploaded segment. 
With optional suffix we would have to fall back to segment status in ZK metadta 
to know if it is an uploaded segment(do suggest if there is a better way).
   
   It may even be complex to parse the segment name to figure out partitionId 
and segmentId.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-31 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1622832355


##
pinot-common/src/main/java/org/apache/pinot/common/utils/Benchmark.java:
##
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Random;
+import java.util.regex.Pattern;
+
+
+public class Benchmark {
+  public static final String SEPARATOR = "_";
+  public static final String UPLOADED_PREFIX = "uploaded";
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
+  private static final Pattern NAME_PATTERN = 
Pattern.compile(UPLOADED_REALTIME_SEGMENT_NAME_REGEX);
+
+  private Benchmark() {
+  }
+
+  public static boolean isUploadedRealtimeSegmentNameMethod1(String 
segmentName) {
+String[] parts = segmentName.split(SEPARATOR);

Review Comment:
   Got you, I wasn't aware of the subtelty. This file was also not supposed to 
be checked in, removing.
   Did some benchmark with 3 methods:
   
   ```
   Method 1 (String.split()) took: 2266343500 ns
   Method 2 (regex match) took: 3760987625 ns
   Method 3 (StringUtils.split()) took: 2412549250 ns
   
   Method 3 is 0.939397817474607 times faster than Method 1
   Method 3 is 1.5589267763134784 times faster than Method 2
   ```
   
   ```
   Method 1 (String.split()) took: 2462188708 ns
   Method 2 (regex match) took: 4130721000 ns
   Method 3 (StringUtils.split()) took: 2082269750 ns
   
   Method 3 is 1.1824542463818628 times faster than Method 1
   Method 3 is 1.983758828557155 times faster than Method 2
   ```
   
   ```
   Method 1 (String.split()) took: 2275362125 ns
   Method 2 (regex match) took: 4017714167 ns
   Method 3 (StringUtils.split()) took: 2003299125 ns
   
   Method 3 is 1.1358074770785915 times faster than Method 1
   Method 3 is 2.0055488053986945 times faster than Method 2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-30 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1621458240


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}

Review Comment:
   this comment might be missed: 
https://github.com/apache/pinot/pull/13107#discussion_r1619430793 
   
   basically, I think it can be more flexible to use an optional suffix and not 
enforce the value of the suffix, e.g. 
`{tableName}_{partitionId}_{sequenceId}_{creationTime}_{optional suffix}` as 
users can use suffix value to differentiate segments uploaded in different ways 
(e.g. via minion tasks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-30 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1621456711


##
pinot-common/src/main/java/org/apache/pinot/common/utils/Benchmark.java:
##
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Random;
+import java.util.regex.Pattern;
+
+
+public class Benchmark {
+  public static final String SEPARATOR = "_";
+  public static final String UPLOADED_PREFIX = "uploaded";
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
+  private static final Pattern NAME_PATTERN = 
Pattern.compile(UPLOADED_REALTIME_SEGMENT_NAME_REGEX);
+
+  private Benchmark() {
+  }
+
+  public static boolean isUploadedRealtimeSegmentNameMethod1(String 
segmentName) {
+String[] parts = segmentName.split(SEPARATOR);

Review Comment:
   what about `StringUtils.split()`? because String.split() does some regex 
matches internally too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1619935544


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,44 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *
+   * @param segmentName replacing segment
+   * @param currentSegmentName current segment having the record for the given 
primary key
+   * @param segmentCreationTimeMs creation time of replacing segment
+   * @param currentSegmentCreationTimeMs creation time of current segment
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  private boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+if (LLCSegmentName.isLLCSegment(segmentName) && 
LLCSegmentName.isLLCSegment(currentSegmentName)

Review Comment:
   Thanks, the correction and optimization sounds good. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1619934716


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";

Review Comment:
   I did a quick benchmarking with 10 million strings and String.split() method 
is takes 0.57 amount of time compared to regex based. Sounds good to go with 
String.split() based approach.
   
   Test 1
   ```
   Method 1 (String.split()) took: 2253611542 ns
   Method 2 (regex match) took: 3891154500 ns
   Method 2 is 0.5791626988853822 times faster than Method 1
   ```
   
   Test 2
   ```
   Method 1 (String.split()) took: 2156181167 ns
   Method 2 (regex match) took: 3929024375 ns
   Method 2 is 0.5487828430690253 times faster than Method 1
   ```
   
   Test 3
   ```
   Method 1 (String.split()) took: 2167354959 ns
   Method 2 (regex match) took: 3646892125 ns
   Method 2 is 0.5943019109730316 times faster than Method 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1619430793


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";

Review Comment:
   As to the naming pattern, I wonder it may be more flexible to change 
`uploaded` as an optional suffix and not enforce the suffix value.
   
   e.g. `{tableName}_{partId}_{seqId}_{ctime}_{optional suffix}`. e.g. I may 
want some minion tasks to upload segments but to differentiate from those 
uploaded manually, I may use the task name as the suffix. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1619257347


##
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##
@@ -158,6 +158,44 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
 }
   }
 
+  /**
+   *  When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   *  When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   *  When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *
+   * @param segmentName replacing segment
+   * @param currentSegmentName current segment having the record for the given 
primary key
+   * @param segmentCreationTimeMs creation time of replacing segment
+   * @param currentSegmentCreationTimeMs creation time of current segment
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  private boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+  long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+if (LLCSegmentName.isLLCSegment(segmentName) && 
LLCSegmentName.isLLCSegment(currentSegmentName)

Review Comment:
   I think the comparison of seqIds shouldn't be part of the if-check, as 
that'd miss a case when both are LLC but seqIds are not `a > b`.
   ```
   if (LLC && LLC) {
  return seqId_a > seqId_b;
   }
   ```
   
   And we can do LLC.of() and check nulls to save some parsing cost, as both 
isLLCSegemnt() and getSequenceNumber() does parsing inside. Same for the checks 
on uploaded names below, we can do Uploaded.of() and check nulls to save some 
cost of regex evaluation.



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
+
+  private static final Pattern NAME_PATTERN = 
Pattern.compile(UPLOADED_REALTIME_SEGMENT_NAME_REGEX);
+  private static final String UPLOADED_PREFIX = "uploaded";
+  private static final String SEPARATOR = "_";
+  private final String _tableName;
+  private final int _partitionId;
+  private final int _sequenceId;
+  private final long _creationTime;
+  private final String _segmentName;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+Matcher matcher = NAME_PATTERN.matcher(segmentName);
+
+if (matcher.find()) {
+  _tableName = matcher.group(1);
+  _partitionId = Integer.parseInt(matcher.group(2));
+  _sequenceId = Integer.parseInt(matcher.group(3));
+  _creationTime = Long.parseLong(matcher.group(4));
+
+  _segmentName = segmentName;
+} else {
+  throw new IllegalArgumentException("Invalid uploaded realtime segment 
name: " + segmentName);
+}
+  }
+
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, int 
sequenceId, long creationTime) {
+_tableName = tableName;
+_partitionId = partitionId;
+_sequenceId = sequenceId;
+_creationTime = 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1619054555


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";

Review Comment:
   @klsince, I see LLC segment name does not validate the integer and long 
fields encoded in name:
   ```
   public static boolean isLLCSegment(String segmentName) {
   int numSeparators = 0;
   int index = 0;
   while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
 numSeparators++;
 index += 2; // SEPARATOR.length()
   }
   return numSeparators == 3;
 }
   ```
   
   
   We can have a similar string split based checks and parsing for uniformity 
but I think it would be good to validate the int and long fields as well, as 
they get used later in the code, the changes would look like this.
   
   ```
   public UploadedRealtimeSegmentName(String segmentName) {
   
   // split the segment name by the separator and get creation time, 
sequence id, partition id and table name from
   // the end and validate segment name starts with prefix uploaded_
   try {
 String[] parts = segmentName.split(SEPARATOR);
 Preconditions.checkState(parts.length >= 5 && 
parts[0].equals(UPLOADED_PREFIX),
 "Uploaded segment name must be of the format 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}");
 _creationTime = Long.parseLong(parts[parts.length - 1]);
 _sequenceId = Integer.parseInt(parts[parts.length - 2]);
 _partitionId = Integer.parseInt(parts[parts.length - 3]);
 _tableName = Joiner.on(SEPARATOR).join(Arrays.copyOfRange(parts, 1, 
parts.length - 3));
 _segmentName = segmentName;
   } catch (NumberFormatException e) {
 throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
   }
 }
   ```
   
   ```
   public static boolean isUploadedRealtimeSegmentName(String segmentName) {
   String[] parts = segmentName.split(SEPARATOR);
   if(parts.length < 5) {
 return false;
   }
   if (!parts[0].equals(UPLOADED_PREFIX)) {
 return false;
   }
   // return false if parts[-1] is not an integer
   try {
 Long.parseLong(parts[parts.length - 1]);
 Integer.parseInt(parts[parts.length - 2]);
 Integer.parseInt(parts[parts.length - 3]);
   } catch (NumberFormatException e) {
 return false;
   }
   return true;
 }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-29 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1618286142


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";

Review Comment:
   The regex helped avoid parsing numbers during validation. Using separator 
based approch becomes more complex with handling number parsing.
   
   Regex `uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$` is handling sequenceId as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-28 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1618004659


##
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##
@@ -44,6 +44,16 @@ public static Integer getRealtimeSegmentPartitionId(String 
segmentName, String r
 if (llcSegmentName != null) {
   return llcSegmentName.getPartitionGroupId();
 }
+
+try {
+  UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new 
UploadedRealtimeSegmentName(segmentName);

Review Comment:
   we may add a util method UploadedRealtimeSegmentName.of(segmentName) like 
LLCSegmentName.of()



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";

Review Comment:
   hmm. why not just split the fields out from the segment name via 
StringUtils.split()? In case the table name might include `_`, we can get the 
required fields as counted from the end of name. And for simplicity, we can 
enforce to have `seqId` field (which seems optional in this regex pattern).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-28 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1617728542


##
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java:
##
@@ -159,14 +161,24 @@ private SegmentNameGenerator 
getSegmentNameGenerator(SegmentGeneratorConfig segm
 
Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
 IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
 
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), 
dateTimeFormatSpec,
-segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX),
-appendUUIDToSegmentName);
+segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX), 
appendUUIDToSegmentName);
   case BatchConfigProperties.SegmentNameGeneratorType.INPUT_FILE:
 String inputFileUri = 
_taskSpec.getCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY);
 return new 
InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN),
-segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE),
-inputFileUri,
-appendUUIDToSegmentName);
+segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE), 
inputFileUri, appendUUIDToSegmentName);
+  case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+Preconditions.checkState(segmentGeneratorConfig.getCreationTime() != 
null,
+"Creation time must be set for uploaded realtime segment name 
generator");
+
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() 
!= -1,
+"Valid partition id must be set for uploaded realtime segment name 
generator");

Review Comment:
   Do you mean for validation?
   
   The sequenceId is sent as part of SegmentNameGenerator method interface
   
   
https://github.com/apache/pinot/blob/c2b1132c7e5f5b3e60155466debfb42ff3aa192d/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGenerator.java#L40



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-28 Thread via GitHub


tibrewalpratik17 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1617017946


##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
+
+  private static final Pattern NAME_PATTERN = 
Pattern.compile(UPLOADED_REALTIME_SEGMENT_NAME_REGEX);
+  private static final String UPLOADED_PREFIX = "uploaded";
+  private static final String SEPARATOR = "_";
+  private final String _tableName;
+  private final int _partitionId;
+  private final int _sequenceId;
+  private final long _creationTime;
+  private final String _segmentName;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+Matcher matcher = NAME_PATTERN.matcher(segmentName);
+
+if (matcher.find()) {
+  _tableName = matcher.group(1);
+  _partitionId = Integer.parseInt(matcher.group(2));
+  _sequenceId = Integer.parseInt(matcher.group(3));
+  _creationTime = Long.parseLong(matcher.group(4));
+
+  _segmentName = segmentName;
+} else {
+  throw new IllegalArgumentException("Invalid uploaded realtime segment 
name: " + segmentName);
+}
+  }
+
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, int 
sequenceId, long creationTime) {
+_tableName = tableName;
+_partitionId = partitionId;
+_sequenceId = sequenceId;
+_creationTime = creationTime;
+_segmentName =
+UPLOADED_PREFIX + SEPARATOR + tableName + SEPARATOR + partitionId + 
SEPARATOR + sequenceId + SEPARATOR
++ creationTime;

Review Comment:
   nit: we can use `Joiner` for better readability.



##
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Class to represent segment names like: 
uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
+ *
+ * This naming convention is adopted to represent a batch generated segment 
uploaded to a realtime table. The naming
+ * convention has been kept different from {@LLCSegmentName} to differentiate 
between batch generated segments and
+ * low level consumer segments.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable {
+
+  public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = 
"^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
+
+  private static final Pattern NAME_PATTERN = 

Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1605420533


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   Summarising the chat discussion with @klsince here:
   Plan to use naming convention: 
`uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}`
   1. The sequenceId is needed for scenarios where bulk data can be loaded per 
day and creation time is constant. e.g. backfilling data per day(creation time 
last second of the day) and each partition will have multiple segments each 
differentiated by sequenceId.
   2. `_` delimiter is used to avoid LLCSegment like treatment to uploaded 
segments.
   Updating the PR accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1605390671


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   I actually dropped {sequenceId} intentionally, otherwise, the segment name 
becomes LLC format, and would cause the complexities you mentioned in another 
alternative proposal, like the checks on start/end offsets. In fact, the 
endOffset of latest LLC segment (the one with max seqId) is used to resume 
stream consumption, and we probably don't want to affect that logic with 
uploaded segments.
   
   > Do we enforce naming conventions? ... ... add a SegmentNameGenerator 
implementation ...
   
   I don't think so. We may need one just for uploading segments to upsert 
tables, due to its special requirements on partition id and to break comparison 
ties. And good point for the need of a new name generator.
   
   btw, from what I learnt while reviewing this PR, there seems a design choice 
for RT segments that the {partitionId} encoded in the segment name is the 
source of truth, rather than the one kept in ZK metadata as there might no 
partition info at all in ZK metadata. So following on that design principle, 
I'd prefer to keep encoding the {partitionId} in segment name for uploaded 
segments as well. In this way, we can 1) avoid the cost of reading ZK metadata 
every time we need partitionId for the uploaded segments; 2) avoid changes on 
persistent segment metadata in ZK or on disk, which might make things a bit 
easier when to consider upgrade/downgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604457365


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   
   > `uploaded_{tableName}__{partitionId}__{creationTime}`
   
   We will have to put a seqId in case multiple segments are to be uploaded. 
Something like 
`uploaded_{tableName}__{partitionId}__{sequenceId}__{creationTime}`
   
   Do we enforce naming conventions? I only see `LLCSegmentName`. I can add one 
for uploaded realtime segments but wasn't preferring this approach if this is 
not something strictly enforced.
   
   Additionally, we will have to add a `SegmentNameGenerator` implementation so 
that the user workload generating and uploading segments can create these 
segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604457365


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   
   > uploaded_{tableName}__{partitionId}__{creationTime}
   
   We will have to put a seqId in case multiple segments are to be uploaded. 
Something like 
`uploaded_{tableName}__{partitionId}__{sequenceId}__{creationTime}`
   
   Do we enforce naming conventions? I only see `LLCSegmentName`. I can add one 
for uploaded realtime segments but wasn't preferring this approach if this is 
not something strictly enforced.
   
   Additionally, we will have to add a `SegmentNameGenerator` implementation so 
that the user workload generating and uploading segments can create these 
segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604457365


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   > While processing segments not in LLC name pattern, segments added first 
are favored, but we can not control which segments get processed first next 
time when server restarts.
   
   We have segment status as `UPLOADED`, can we consider comparing 
create/upload time to do resolution?
   
   > uploaded_{tableName}__{partitionId}__{creationTime}
   
   Do we enforce naming conventions? I only see `LLCSegmentName`. I can add one 
for uploaded segment name for realtime table segment but wasn't prefering this 
approach if this is not something strictly enforced.
   
   Additionally, we will have to add a `SegmentNameGenerator` implementation so 
that the user workload generating and uploading segments can create these 
segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-17 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604457365


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   > While processing segments not in LLC name pattern, segments added first 
are favored, but we can not control which segments get processed first next 
time when server restarts.
   
   We have segment status as `UPLOADED`, we can consider comparing 
create/upload time to do resolution.
   
   > uploaded_{tableName}__{partitionId}__{creationTime}
   
   Do we enforce naming conventions? I only see `LLCSegmentName`. I can add one 
for uploaded segment name for realtime table segment but wasn't prefering this 
approach if this is not something strictly enforced.
   
   Additionally, we will have to add a `SegmentNameGenerator` implementation so 
that the user workload generating and uploading segments can create these 
segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-16 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604127022


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   I see you have proposed in description that we can use a new naming pattern 
for uploaded segment. Similarly, I'd propose something a bit simpler: 
`uploaded_{tableName}__{partitionId}__{creationTime}`, with 3 fields as 
separated by `__` (double underscore), in order to provide {partitionId} info 
for segment assignment and help break tie with {creationTime}.
   
   The `uploaded_` prefix can be anything or none, but better to have it to 
lookup uploaded segments easily, e.g. MergeRollup task names the segments it 
generates with a prefix `merged_` when uploading them to the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-16 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1604008348


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   make sense. thanks for the explanation.
   
   Another concern is the comparison logic (as in 
ConcurrentMapPartitionUpsertMetadataManager) to break tie is not be 
deterministic for segments not in LLC naming pattern. While processing segments 
not in LLC name pattern, segments added first are favored, but we can not 
control which segments get processed first next time when server restarts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-16 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1602319612


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   I might have missed it, but how to configure `SegmentPartitionConfig` in 
TableConfig for tables that allow to upload segments built and partitioned 
externally? and segments might be partitioned on multi-columns with hash 
functions not supported by Pinot? For such table, will all of its segments get 
uploaded after being built/partitioned externally, or a mix of segments from 
Realtime ingestion and segments being built externally?
   
   ~~Another place that might be broken with this new field 
`uploadedSegmentPartitionId` is in MergeRollupTaskGenerator around L430.~~ (nvm 
MergeRollup skips RealTime tables)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-16 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1603148256


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   > I might have missed it, but how to configure SegmentPartitionConfig in 
TableConfig for tables that allow to upload segments built and partitioned 
externally?
   
   We don't need to configure the table similar to how we don't need to do for 
realtime stream ingestion for upsert tables.
   
   Providing some more context how this would fit in current design.
   There are two scenarios where data partitioning comes to play:
   1. Query routing: 
[[docs](https://docs.pinot.apache.org/operators/operating-pinot/tuning/routing#data-ingested-partitioned-by-some-column)]
 Data partitioning is not a requirement here but a good optimization.
   2. Segment assignment:
   a. If the data is partitioned on a single column and with a Pinot 
supported algorithm, we configure the table as:
   ```
   ...
   "tableIndexConfig": {
 ...
 "segmentPartitionConfig": {
   "columnPartitionMap": {
 "memberId": {
   "functionName": "Modulo",
   "numPartitions": 3
 }
   }
 },
 ...
   },
   ``` 
   **Partitioning for upsert tables**:
   Consuming segment assignement: The stream is always externally 
partitioned (either on PK or other field which can still ensure the PKs are all 
part of the same partition) and does not need to use one of Pinot supported 
algorithms. `segmentPartitionConfig` need not be set for the upsert table 
either. Each `LLCSegmentName` contains a partitionId substring which is derived 
from the streams partitionId. When assigning a segment to instance, we get the 
partition id by parsing the LLCSegmentName in 
`SegmentUtils.getRealtimeSegmentPartitionId`.
   
   **Uploaded segment assignment**: Uploaded segments are not generated 
with LLCSegmentName convention. The only way to specify partitioning info is 
via `segmentPartitionConfig` via table config which is not possible if the 
stream is using custom partitioning.
   If one wants to backfill/uplaod segment to such custom partitioned 
stream, the uploaded segment must provide the partitionId so the 
segmentAssignement can put the segments in the same instances as the consuming 
segments of the same partition un pusert table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-16 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1603148256


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   > I might have missed it, but how to configure SegmentPartitionConfig in 
TableConfig for tables that allow to upload segments built and partitioned 
externally?
   
   We don't need to configure the table similar to how we don't need to do for 
realtime stream ingestion for upsert tables.
   
   Providing some more context for why the change is needed
   There are two scenarios where data partitioning comes to play:
   1. Query routing: 
[[docs](https://docs.pinot.apache.org/operators/operating-pinot/tuning/routing#data-ingested-partitioned-by-some-column)]
 Data partitioning is not a requirement here but a good optimization.
   2. Segment assignment:
   a. If the data is partitioned on a single column and with a Pinot 
supported algorithm, we configure the table as:
   ```
   ...
   "tableIndexConfig": {
 ...
 "segmentPartitionConfig": {
   "columnPartitionMap": {
 "memberId": {
   "functionName": "Modulo",
   "numPartitions": 3
 }
   }
 },
 ...
   },
   ``` 
   **Partitioning for upsert tables**:
   Consuming segment assignement: The stream is always externally 
partitioned (either on PK or other field which can still ensure the PKs are all 
part of the same partition) and does not need to use one of Pinot supported 
algorithms. `segmentPartitionConfig` need not be set for the upsert table 
either. Each `LLCSegmentName` contains a partitionId substring which is derived 
from the streams partitionId. When assigning a segment to instance, we get the 
partition id by parsing the LLCSegmentName in 
`SegmentUtils.getRealtimeSegmentPartitionId`.
   
   **Uploaded segment assignment**: Uploaded segments are not generated 
with LLCSegmentName convention. The only way to specify partitioning info is 
via `segmentPartitionConfig` via table config which is not possible if the 
stream is using custom partitioning.
   If one wants to backfill/uplaod segment to such custom partitioned 
stream, the uploaded segment must provide the partitionId so the 
segmentAssignement can put the segments in the same instances as the consuming 
segments of the same partition un pusert table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-15 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1602319612


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   I might have missed it, but how to configure `SegmentPartitionConfig` in 
TableConfig for tables that allow to upload segments built and partitioned 
externally? and segments might be partitioned on multi-columns with hash 
functions not supported by Pinot? For such table, will all of its segments get 
uploaded after being built/partitioned externally, or a mix of segments from 
Realtime ingestion and segments being built externally?
   
   Another place that might be broken with this new field 
`uploadedSegmentPartitionId` is in MergeRollupTaskGenerator around L430.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-15 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1602098764


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   I see one usage in `SegmentAssignmentUtils.getPartitionId()` used by 
`ReplicaGroupSegmentAssignmentStrategy` for assigning offline segments.
   
   I think we can fail fast here if column partition and 
uploadedSegmentPartitionId both are provided as `uploadedSegmentPartitionId` is 
more for keeping the assignment same as stream partitions for upserts but it 
does not make sense for an offline table.
   
   Open to better name suggestion for `uploadedSegmentPartitionId` in case it 
sounds misleading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-15 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1602067423


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   @klsince, I validated that `getColumnPartitionMap()` is being called from 
broker code, which is needed for query routing. This should still hold good if 
partition column is configured for the table.
   
   Adding additional metadata field for parititionId is to make sure the 
uploaded segments are placed together with the stream partitions assigned to 
hosts by controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-14 Thread via GitHub


klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1600770456


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   there are a few places that directly get columnPartitionMap and iterate it 
to get partitionId (you can find those places by searching usage of method 
`getColumnPartitionMap()` in this class), and this new field might fail those 
places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-13 Thread via GitHub


rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1598868561


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   Updated the description with an Alternatives section. There are assumptions 
in multiple places in Pinot codebase when using LLCSegmentName which require a 
much more complex handling of uploaded segments from realtime segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-13 Thread via GitHub


ankitsultana commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1598813431


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;
+  }
+
+  /**
+   * Constructor for the class.
+   *
+   * @param columnPartitionMap Column name to ColumnPartitionMetadata map.
+   */
+  @JsonCreator
+  public SegmentPartitionMetadata(
+  @Nullable @JsonProperty("columnPartitionMap") Map columnPartitionMap,
+  @Nullable @JsonProperty(value = "uploadedSegmentPartitionId", 
defaultValue = "-1")

Review Comment:
   @rohityadav1993 : we could just upload segments with the appropriate name 
format right? Can you add some reasoning in the PR Description about why that 
doesn't work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-13 Thread via GitHub


Jackie-Jiang commented on PR #13107:
URL: https://github.com/apache/pinot/pull/13107#issuecomment-2108248763

   cc @snleee @swaminathanmanish to take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-13 Thread via GitHub


satishd commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1597953408


##
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java:
##
@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
   @Nonnull @JsonProperty("columnPartitionMap") Map columnPartitionMap) {
 Preconditions.checkNotNull(columnPartitionMap);
 _columnPartitionMap = columnPartitionMap;
+_uploadedSegmentPartitionId = -1;

Review Comment:
   Good to define a constant that can be used at other places directly checking 
for `-1`, may be with a better name.
   
   ```
   public static final int NON_EXTERNAL_PARTITION_ID = -1;
   ```



##
pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java:
##
@@ -35,4 +47,32 @@ public void testGetSegmentCreationTimeMs() {
 segmentZKMetadata.setPushTime(2000L);
 assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 
2000L);
   }
+
+  @Test
+  public void testGetUploadedRealtimeSegmentPartitionId() {

Review Comment:
   Good to add a UT that checks for 
`SegmentPartitionMetadata.getUploadedSegmentPartitionId` -1 when it is not 
externally partitioned. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] [Backfill] allow externally partitioned segment uploads for upsert tables [pinot]

2024-05-09 Thread via GitHub


codecov-commenter commented on PR #13107:
URL: https://github.com/apache/pinot/pull/13107#issuecomment-2102011974

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/13107?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `0%` with `23 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 0.00%. Comparing base 
[(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`97dc2cf`)](https://app.codecov.io/gh/apache/pinot/pull/13107?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 420 commits behind head on master.
   
   | 
[Files](https://app.codecov.io/gh/apache/pinot/pull/13107?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...mon/metadata/segment/SegmentPartitionMetadata.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcommon%2Fmetadata%2Fsegment%2FSegmentPartitionMetadata.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvc2VnbWVudC9TZWdtZW50UGFydGl0aW9uTWV0YWRhdGEuamF2YQ==)
 | 0.00% | [6 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   | 
[...va/org/apache/pinot/common/utils/SegmentUtils.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcommon%2Futils%2FSegmentUtils.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU2VnbWVudFV0aWxzLmphdmE=)
 | 0.00% | [4 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   | 
[...ot/segment/spi/creator/SegmentGeneratorConfig.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-segment-spi%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Fspi%2Fcreator%2FSegmentGeneratorConfig.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3Qtc2VnbWVudC1zcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3NlZ21lbnQvc3BpL2NyZWF0b3IvU2VnbWVudEdlbmVyYXRvckNvbmZpZy5qYXZh)
 | 0.00% | [4 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   | 
[...egment/spi/index/metadata/SegmentMetadataImpl.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-segment-spi%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Fspi%2Findex%2Fmetadata%2FSegmentMetadataImpl.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3Qtc2VnbWVudC1zcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3NlZ21lbnQvc3BpL2luZGV4L21ldGFkYXRhL1NlZ21lbnRNZXRhZGF0YUltcGwuamF2YQ==)
 | 0.00% | [4 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   | 
[...ment/creator/impl/SegmentColumnarIndexCreator.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-segment-local%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fsegment%2Flocal%2Fsegment%2Fcreator%2Fimpl%2FSegmentColumnarIndexCreator.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9TZWdtZW50Q29sdW1uYXJJbmRleENyZWF0b3IuamF2YQ==)
 | 0.00% | [3 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   | 
[...ot/controller/helix/core/util/ZKMetadataUtils.java](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree=pinot-controller%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcontroller%2Fhelix%2Fcore%2Futil%2FZKMetadataUtils.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3V0aWwvWktNZXRhZGF0YVV0aWxzLmphdmE=)
 | 0.00% | [2 Missing :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/13107?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff