sajjad-moradi commented on code in PR #10121:
URL: https://github.com/apache/pinot/pull/10121#discussion_r1082029067


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
 
 public class IngestionDelayTracker {
 
+  // Class to wrap supported timestamps collected for an ingested event
+  private static class IngestionTimestamps {
+    IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+      _ingestionTimeMs = ingestionTimesMs;
+      _creationTimeMs = creationTimeMs;

Review Comment:
   `_ingestionTimeMs` is basically the time when a message was published to the 
last stream.
   `_creationTimeMs` is the time when a message was published to the first 
stream.
   
   Should we rename them to something like `lastStreamPublishTimeMs` and 
`firstStreamPublishTimeMs`? Or at least we need to have some comments to 
clearly specify what each variable is.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -615,7 +615,8 @@ private boolean processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePi
       }
     } else if (!prematureExit) {
       // Record Pinot ingestion delay as zero since we are up-to-date and no 
new events
-      
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), 
_partitionGroupId);
+      
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(), 
System.currentTimeMillis(),

Review Comment:
   Refactor to `long now = System.currentTimeMillis(); ...`



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 clock.millis() - i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+          clock.millis() - (i + 1));

Review Comment:
   Ditto. Please remove all other redundant parentheses.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -49,6 +49,17 @@ public interface RowMetadata {
    */
   long getRecordIngestionTimeMs();
 
+  /**
+   * Returns the creation timestamp associated with the record. In cases where 
the upstream ingestion pipeline is
+   * simple this timestamp matches the result of getRecordIngestionTimeMs();
+   *
+   * Expected to be used for stream-based sources.
+   *
+   * @return timestamp (epoch in milliseconds) when the row was initially 
created and ingested upstream for the first
+   *         time Long.MIN_VALUE if not available
+   */
+  long getRecordCreationTimeMs();

Review Comment:
   Shouldn't we return the default value Long.MIN_VALUE to not break different 
streams' implementations?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -76,6 +76,15 @@
 
 public class IngestionDelayTracker {
 
+  // Class to wrap supported timestamps collected for an ingested event
+  private static class IngestionTimestamps {
+    IngestionTimestamps(long ingestionTimesMs, long creationTimeMs) {
+      _ingestionTimeMs = ingestionTimesMs;
+      _creationTimeMs = creationTimeMs;
+    }
+    public final long _ingestionTimeMs;
+    public final long _creationTimeMs;

Review Comment:
   Do these need to be public?



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java:
##########
@@ -85,29 +85,37 @@ public void testRecordIngestionDelayWithNoAging() {
 
     // Test we follow a single partition up and down
     for (long i = 0; i <= maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, partition0);
+      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);

Review Comment:
   Parentheses are not needed in `(i + 1)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to