sajjad-moradi commented on code in PR #10121:
URL: https://github.com/apache/pinot/pull/10121#discussion_r1082029067
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
public class IngestionDelayTracker {
+ // Class to wrap supported timestamps collected for an ingested event
+ private static class IngestionTimestamps {
+ IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+ _ingestionTimeMs = ingestionTimesMs;
+ _creationTimeMs = creationTimeMs;
Review Comment:
`_ingestionTimeMs` is basically the time when a message was published to the
last stream.
`_creationTimeMs` is the time when a message was published to the first
stream.
Should we rename them to something like `lastStreamPublishTimeMs` and
`firstStreamPublishTimeMs`? Or at least we need to have some comments to
clearly specify what each variable is.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -615,7 +615,8 @@ private boolean processStreamEvents(MessageBatch
messagesAndOffsets, long idlePi
}
} else if (!prematureExit) {
// Record Pinot ingestion delay as zero since we are up-to-date and no
new events
-
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(),
_partitionGroupId);
+
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(),
System.currentTimeMillis(),
Review Comment:
Refactor to `long now = System.currentTimeMillis(); ...`
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
// Test we follow a single partition up and down
for (long i = 0; i <= maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+ clock.millis() - (i + 1));
Review Comment:
Ditto. Please remove all other redundant parentheses.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -49,6 +49,17 @@ public interface RowMetadata {
*/
long getRecordIngestionTimeMs();
+ /**
+ * Returns the creation timestamp associated with the record. In cases where
the upstream ingestion pipeline is
+ * simple this timestamp matches the result of getRecordIngestionTimeMs();
+ *
+ * Expected to be used for stream-based sources.
+ *
+ * @return timestamp (epoch in milliseconds) when the row was initially
created and ingested upstream for the first
+ * time Long.MIN_VALUE if not available
+ */
+ long getRecordCreationTimeMs();
Review Comment:
Shouldn't we return the default value Long.MIN_VALUE to not break different
streams' implementations?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
public class IngestionDelayTracker {
+ // Class to wrap supported timestamps collected for an ingested event
+ private static class IngestionTimestamps {
+ IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+ _ingestionTimeMs = ingestionTimesMs;
+ _creationTimeMs = creationTimeMs;
+ }
+ public final long _ingestionTimeMs;
+ public final long _creationTimeMs;
Review Comment:
Do these need to be public?
##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
// Test we follow a single partition up and down
for (long i = 0; i <= maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
Review Comment:
Parentheses are not needed in `(i + 1)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]