Jackie-Jiang commented on a change in pull request #7200:
URL: https://github.com/apache/pinot/pull/7200#discussion_r682018281
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -404,10 +406,10 @@ public boolean hasNext() {
values[i] = value;
}
PrimaryKey primaryKey = new PrimaryKey(values);
- Object timeValue =
columnToReaderMap.get(_timeColumnName).getValue(_docId);
- Preconditions.checkArgument(timeValue instanceof Comparable, "time
column shall be comparable");
- long timestamp = IngestionUtils.extractTimeValue((Comparable)
timeValue);
- return new PartitionUpsertMetadataManager.RecordInfo(primaryKey,
_docId++, timestamp);
+ Object upsertComparisonValue =
columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
+ Preconditions.checkArgument(upsertComparisonValue instanceof
Comparable, "time column shall be comparable");
Review comment:
```suggestion
Preconditions.checkState(upsertComparisonValue instanceof
Comparable, "Upsert comparison column: %s must be comparable",
_upsertComparisonColumn);
```
##########
File path:
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordLocation.java
##########
@@ -27,12 +27,13 @@
public class RecordLocation {
private final IndexSegment _segment;
private final int _docId;
- private final long _timestamp;
+ /** value used to denote the order */
+ private final Comparable _comparisonValue;
- public RecordLocation(IndexSegment indexSegment, int docId, long timestamp) {
+ public RecordLocation(IndexSegment indexSegment, int docId, Comparable
timestamp) {
Review comment:
```suggestion
public RecordLocation(IndexSegment indexSegment, int docId, Comparable
comparisonValue) {
```
##########
File path:
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -180,7 +180,7 @@ public GenericRow updateRecord(IndexSegment segment,
RecordInfo recordInfo, Gene
// Update the record location when the new timestamp is greater than
or equal to the current timestamp. Update
Review comment:
(nit) update the comments. Same for other places
##########
File path:
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -319,6 +319,7 @@ private static void validateTaskConfigs(TableConfig
tableConfig) {
* - the primary key exists on the schema
* - strict replica-group is configured for routing type
* - consumer type must be low-level
+ * - comparison column exists
Review comment:
(nit) indentation
##########
File path:
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
##########
@@ -70,7 +71,8 @@ public static MutableSegmentImpl
createMutableSegmentImpl(Schema schema, Set<Str
.setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
.setMemoryManager(new
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
-
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
+ .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
+ .setUpsertComparisonColumn(comparisonColumn).build();
Review comment:
(nit) move this before the metadata manager
##########
File path:
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -507,11 +511,11 @@ private boolean isUpsertEnabled() {
private GenericRow handleUpsert(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
- Object timeValue = row.getValue(_timeColumnName);
- Preconditions.checkArgument(timeValue instanceof Comparable, "time column
shall be comparable");
- long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
- return _partitionUpsertMetadataManager
- .updateRecord(this, new
PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, timestamp), row);
+ Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
+ Preconditions.checkArgument(upsertComparisonValue instanceof Comparable,
Review comment:
```suggestion
Preconditions.checkState(upsertComparisonValue instanceof Comparable,
"Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]