pvillard31 commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2822763838


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessorTest.java:
##########
@@ -363,6 +365,45 @@ void testInvalidRecordsWithSchemaEvolution() {
         
secondFailureFlowFile.assertContentEquals(KinesisRecordPayload.extract(records.get(4)),
 UTF_8);
     }
 
+    @Test
+    void testMillisBehindLatestAttributeOnSuccessAndFailure() {
+        final List<KinesisClientRecord> records = List.of(
+                createKinesisRecord(USER_JSON_1, "1"),
+                createKinesisRecord(INVALID_JSON, "2")
+        );
+        final long millis = 5000L;
+        final ConsumeRecordsResult consumeResult = new 
ConsumeRecordsResult(records, millis);
+
+        final ProcessingResult result = processor.processRecords(session, 
TEST_STREAM_NAME, TEST_SHARD_ID, consumeResult);
+
+        assertEquals(1, result.successFlowFiles().size());
+        assertEquals(1, result.parseFailureFlowFiles().size());
+
+        final FlowFile successFlowFile = result.successFlowFiles().getFirst();
+        assertEquals(String.valueOf(millis), 
successFlowFile.getAttribute(MILLIS_BEHIND_LATEST));
+
+        final FlowFile failureFlowFile = 
result.parseFailureFlowFiles().getFirst();
+        assertEquals(String.valueOf(millis), 
failureFlowFile.getAttribute(MILLIS_BEHIND_LATEST));
+    }
+
+    @Test
+    void testNullMillisBehindLatestAttributeNotSet() {
+        final List<KinesisClientRecord> records = List.of(
+                createKinesisRecord(USER_JSON_1, "1")
+        );
+
+        final ProcessingResult result = processor.processRecords(session, 
TEST_STREAM_NAME, TEST_SHARD_ID, new ConsumeRecordsResult(records, null));
+
+        assertEquals(1, result.successFlowFiles().size());
+
+        final FlowFile successFlowFile = result.successFlowFiles().getFirst();
+        assertEquals(null, successFlowFile.getAttribute(MILLIS_BEHIND_LATEST));

Review Comment:
   assertNull



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java:
##########
@@ -233,6 +241,12 @@ void 
testConsumeSingleMessageFromSingleShard_withoutEnhancedFanOut() {
         assertTrue(
                 streamClient.getEnhancedFanOutConsumerNames().isEmpty(),
                 "No enhanced fan-out consumers should be created for Shared 
Throughput consumer type");
+        final String shardId = flowFile.getAttribute("aws.kinesis.shard.id");
+        final String gaugeName = makeMillisBehindLatestGaugeName(streamName, 
shardId);
+        final List<Double> gaugeValues = runner.getGaugeValues(gaugeName);
+        assertFalse(gaugeValues.isEmpty(), "Expected millisBehindLatest gauge 
to be recorded");
+
+

Review Comment:
   extra lines not needed



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java:
##########
@@ -559,26 +561,67 @@ void testCommitRecordsWhileNewRecordsArrive() {
 
         // Add records to pending queue.
         final List<KinesisClientRecord> batch1 = createTestRecords(2);
-        recordBuffer.addRecords(bufferId, batch1, checkpointer1);
+        recordBuffer.addRecords(bufferId, batch1, checkpointer1, 100L);
 
         // Consume batch1 records (moves from pending to in-progress).
         final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
 
-        final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease);
+        final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease).records();
         assertEquals(batch1, consumedRecords);
 
         // Add more records while others are in-progress.
         final List<KinesisClientRecord> batch2 = createTestRecords(1);
-        recordBuffer.addRecords(bufferId, batch2, checkpointer2);
+        recordBuffer.addRecords(bufferId, batch2, checkpointer2, 100L);
 
         // Commit in-progress records.
         recordBuffer.commitConsumedRecords(lease);
 
         // Consume batch2 records.
-        final List<KinesisClientRecord> remainingRecords = 
recordBuffer.consumeRecords(lease);
+        final List<KinesisClientRecord> remainingRecords = 
recordBuffer.consumeRecords(lease).records();
         assertEquals(batch2, remainingRecords);
     }
 
+    @Test
+    void testConsumeRecordsReturnsLastMillisBehindLatest() {
+        final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1, 
200L);
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer2, 
500L);
+
+        final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+        final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+        assertEquals(2, result.records().size());
+        assertEquals(500L, result.millisBehindLatest());
+    }
+
+    @Test
+    void testConsumeRecordsWithNullMillisBehindLatest() {
+        final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1, 
null);
+
+        final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+        final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+        assertEquals(1, result.records().size());
+        assertNull(result.millisBehindLatest());
+    }
+
+    @Test
+    void testConsumeRecordsWithMixedMillisBehindLatest() {
+        final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1, 
300L);
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer2, 
null);
+
+        final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+        final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+        assertEquals(2, result.records().size());
+        assertEquals(300, result.millisBehindLatest());

Review Comment:
   ```suggestion
           assertEquals(300L, result.millisBehindLatest());
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -696,22 +705,29 @@ private void checkInitializationResult(final 
InitializationResult initialization
         }
     }
 
+
+

Review Comment:
   extra lines not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to