This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e418f7  [FLINK-32229][Connector/Kinesis] Add millisBehindLatest 
metric to Kinesis source implementation
9e418f7 is described below

commit 9e418f7c447faf36517b69f1199e3f46e27c9ad2
Author: Burak Ozakinci <bura...@amazon.co.uk>
AuthorDate: Fri Jun 14 22:42:26 2024 +0100

    [FLINK-32229][Connector/Kinesis] Add millisBehindLatest metric to Kinesis 
source implementation
---
 .../kinesis/source/KinesisStreamsSource.java       |  13 ++-
 .../source/metrics/KinesisShardMetrics.java        |  81 +++++++++++++
 .../kinesis/source/metrics/MetricConstants.java    |  34 ++++++
 .../source/reader/KinesisStreamsSourceReader.java  |  47 +++++++-
 .../reader/PollingKinesisShardSplitReader.java     |  11 +-
 .../source/metrics/KinesisShardMetricsTest.java    |  95 +++++++++++++++
 .../reader/KinesisStreamsSourceReaderTest.java     |  84 +++++++++++---
 .../reader/PollingKinesisShardSplitReaderTest.java | 127 ++++++++++++---------
 .../source/util/KinesisContextProvider.java        |  50 ++++++++
 .../source/util/KinesisStreamProxyProvider.java    |   1 +
 .../connector/kinesis/source/util/TestUtil.java    |  29 +++++
 11 files changed, 496 insertions(+), 76 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
index 4ed24c0..c3455ee 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner
 import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
 import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
 import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
 import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
 import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
@@ -55,7 +56,9 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.utils.AttributeMap;
 
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 
 /**
@@ -124,10 +127,15 @@ public class KinesisStreamsSource<T>
 
         FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>();
+
+        Map<String, KinesisShardMetrics> shardMetricGroupMap = new 
ConcurrentHashMap<>();
+
         // We create a new stream proxy for each split reader since they have 
their own independent
         // lifecycle.
         Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
-                () -> new 
PollingKinesisShardSplitReader(createKinesisStreamProxy(sourceConfig));
+                () ->
+                        new PollingKinesisShardSplitReader(
+                                createKinesisStreamProxy(sourceConfig), 
shardMetricGroupMap);
         KinesisStreamsRecordEmitter<T> recordEmitter =
                 new KinesisStreamsRecordEmitter<>(deserializationSchema);
 
@@ -136,7 +144,8 @@ public class KinesisStreamsSource<T>
                 new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
                 recordEmitter,
                 sourceConfig,
-                readerContext);
+                readerContext,
+                shardMetricGroupMap);
     }
 
     @Override
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java
new file mode 100644
index 0000000..c46d539
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.arns.Arn;
+
+/** A utility class for handling Kinesis shard metrics. */
+@Internal
+public class KinesisShardMetrics {
+    private static final Logger log = 
LoggerFactory.getLogger(KinesisShardMetrics.class);
+    private final MetricGroup metricGroup;
+    private final KinesisShardSplit shardInfo;
+    private volatile long millisBehindLatest = -1L;
+
+    public KinesisShardMetrics(KinesisShardSplit shard, MetricGroup 
rootMetricGroup) {
+        this.shardInfo = shard;
+
+        Arn streamArn = Arn.fromString(shard.getStreamArn());
+        this.metricGroup =
+                rootMetricGroup
+                        
.addGroup(MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP)
+                        .addGroup(
+                                MetricConstants.ACCOUNT_ID_METRIC_GROUP,
+                                streamArn.accountId().get())
+                        .addGroup(MetricConstants.REGION_METRIC_GROUP, 
streamArn.region().get())
+                        .addGroup(
+                                MetricConstants.STREAM_METRIC_GROUP,
+                                streamArn.resource().resource())
+                        .addGroup(MetricConstants.SHARD_METRIC_GROUP, 
shard.getShardId());
+
+        this.metricGroup.gauge(MetricConstants.MILLIS_BEHIND_LATEST, 
this::getMillisBehindLatest);
+
+        log.debug(
+                "Registered metric with identifier: {}",
+                
metricGroup.getMetricIdentifier(MetricConstants.MILLIS_BEHIND_LATEST));
+    }
+
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    public long getMillisBehindLatest() {
+        return millisBehindLatest;
+    }
+
+    public void setMillisBehindLatest(long millisBehindLatest) {
+        log.debug(
+                "Updating millisBehindLatest metric for shard {} to {}",
+                shardInfo.getShardId(),
+                millisBehindLatest);
+
+        this.millisBehindLatest = millisBehindLatest;
+    }
+
+    public void unregister() {
+        ((AbstractMetricGroup) metricGroup).close();
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java
new file mode 100644
index 0000000..683946e
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.metrics;
+
+import org.apache.flink.annotation.Internal;
+
+/** A collection of consumer metric related constant names. */
+@Internal
+public class MetricConstants {
+
+    public static final String KINESIS_STREAM_SOURCE_METRIC_GROUP = 
"KinesisStreamSource";
+    public static final String STREAM_METRIC_GROUP = "stream";
+    public static final String SHARD_METRIC_GROUP = "shardId";
+    public static final String REGION_METRIC_GROUP = "region";
+    public static final String ACCOUNT_ID_METRIC_GROUP = "accountId";
+
+    public static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";
+}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java
index 30bf2cf..0eb7c1a 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java
@@ -26,11 +26,15 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -43,18 +47,23 @@ public class KinesisStreamsSourceReader<T>
         extends SingleThreadMultiplexSourceReaderBase<
                 Record, T, KinesisShardSplit, KinesisShardSplitState> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(KinesisStreamsSourceReader.class);
+    private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
+
     public KinesisStreamsSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> 
elementsQueue,
             SingleThreadFetcherManager<Record, KinesisShardSplit> 
splitFetcherManager,
             RecordEmitter<Record, T, KinesisShardSplitState> recordEmitter,
             Configuration config,
-            SourceReaderContext context) {
+            SourceReaderContext context,
+            Map<String, KinesisShardMetrics> shardMetricGroupMap) {
         super(elementsQueue, splitFetcherManager, recordEmitter, config, 
context);
+        this.shardMetricGroupMap = shardMetricGroupMap;
     }
 
     @Override
     protected void onSplitFinished(Map<String, KinesisShardSplitState> 
finishedSplitIds) {
-        // no-op. We don't need to do anything on finished split now
+        finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
     }
 
     @Override
@@ -66,4 +75,38 @@ public class KinesisStreamsSourceReader<T>
     protected KinesisShardSplit toSplitType(String splitId, 
KinesisShardSplitState splitState) {
         return splitState.getKinesisShardSplit();
     }
+
+    @Override
+    public void addSplits(List<KinesisShardSplit> splits) {
+        splits.forEach(this::registerShardMetricGroup);
+        super.addSplits(splits);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        
this.shardMetricGroupMap.keySet().forEach(this::unregisterShardMetricGroup);
+    }
+
+    private void registerShardMetricGroup(KinesisShardSplit split) {
+        if (!this.shardMetricGroupMap.containsKey(split.getShardId())) {
+            this.shardMetricGroupMap.put(
+                    split.getShardId(), new KinesisShardMetrics(split, 
context.metricGroup()));
+        } else {
+            log.warn(
+                    "Metric group for shard with id {} has already been 
registered.",
+                    split.getShardId());
+        }
+    }
+
+    private void unregisterShardMetricGroup(String shardId) {
+        KinesisShardMetrics removed = this.shardMetricGroupMap.remove(shardId);
+        if (removed != null) {
+            removed.unregister();
+        } else {
+            log.warn(
+                    "Shard metric group unregister failed. Metric group for {} 
does not exist.",
+                    shardId);
+        }
+    }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
index 354dc26..cdf2253 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
@@ -37,6 +38,7 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 
 import static java.util.Collections.singleton;
@@ -53,9 +55,12 @@ public class PollingKinesisShardSplitReader implements 
SplitReader<Record, Kines
 
     private final StreamProxy kinesis;
     private final Deque<KinesisShardSplitState> assignedSplits = new 
ArrayDeque<>();
+    private final Map<String, KinesisShardMetrics> shardMetricGroupMap;
 
-    public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) {
+    public PollingKinesisShardSplitReader(
+            StreamProxy kinesisProxy, Map<String, KinesisShardMetrics> 
shardMetricGroupMap) {
         this.kinesis = kinesisProxy;
+        this.shardMetricGroupMap = shardMetricGroupMap;
     }
 
     @Override
@@ -72,6 +77,10 @@ public class PollingKinesisShardSplitReader implements 
SplitReader<Record, Kines
                         splitState.getNextStartingPosition());
         boolean isComplete = getRecordsResponse.nextShardIterator() == null;
 
+        shardMetricGroupMap
+                .get(splitState.getShardId())
+                
.setMillisBehindLatest(getRecordsResponse.millisBehindLatest());
+
         if (hasNoRecords(getRecordsResponse)) {
             if (isComplete) {
                 return new KinesisRecordsWithSplitIds(
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java
new file mode 100644
index 0000000..4563816
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.metrics;
+
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.util.TestUtil;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.arns.Arn;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class KinesisShardMetricsTest {
+    private static final KinesisShardSplit TEST_SPLIT =
+            TestUtil.getTestSplit(TestUtil.STREAM_ARN, 
TestUtil.generateShardId(1));
+
+    private MetricListener metricListener;
+    private KinesisShardMetrics kinesisShardMetrics;
+
+    @BeforeEach
+    public void init() {
+        metricListener = new MetricListener();
+        kinesisShardMetrics =
+                new KinesisShardMetrics(
+                        TEST_SPLIT,
+                        
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+    }
+
+    @Test
+    void testRegisterShardMetricGroup() {
+        TestUtil.assertMillisBehindLatest(TEST_SPLIT, -1L, metricListener);
+    }
+
+    @Test
+    void testKinesisMetricIdentifier() {
+        Arn streamArn = Arn.fromString(TEST_SPLIT.getStreamArn());
+
+        String expectedIdentifier =
+                String.join(
+                        ".",
+                        MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP,
+                        MetricConstants.ACCOUNT_ID_METRIC_GROUP,
+                        streamArn.accountId().get(),
+                        MetricConstants.REGION_METRIC_GROUP,
+                        streamArn.region().get(),
+                        MetricConstants.STREAM_METRIC_GROUP,
+                        streamArn.resource().resource(),
+                        MetricConstants.SHARD_METRIC_GROUP,
+                        TEST_SPLIT.getShardId(),
+                        MetricConstants.MILLIS_BEHIND_LATEST);
+
+        
assertThat(metricListener.getGauge(expectedIdentifier).isPresent()).isTrue();
+    }
+
+    @Test
+    void testUpdateShardMillisBehindLatest() {
+        kinesisShardMetrics.setMillisBehindLatest(100L);
+        TestUtil.assertMillisBehindLatest(TEST_SPLIT, 100L, metricListener);
+
+        kinesisShardMetrics.setMillisBehindLatest(10000L);
+        TestUtil.assertMillisBehindLatest(TEST_SPLIT, 10000L, metricListener);
+
+        kinesisShardMetrics.setMillisBehindLatest(1000000L);
+        TestUtil.assertMillisBehindLatest(TEST_SPLIT, 1000000L, 
metricListener);
+    }
+
+    @Test
+    void testUnregisterShardMetricGroup() {
+        assertThat(((AbstractMetricGroup) 
kinesisShardMetrics.getMetricGroup()).isClosed())
+                .isFalse();
+        kinesisShardMetrics.unregister();
+        assertThat(((AbstractMetricGroup) 
kinesisShardMetrics.getMetricGroup()).isClosed())
+                .isTrue();
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java
index 7e5381c..55bd2be 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java
@@ -19,27 +19,62 @@
 package org.apache.flink.connector.kinesis.source.reader;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.model.TestData;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
-import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.kinesis.source.util.KinesisContextProvider;
+import org.apache.flink.connector.kinesis.source.util.TestUtil;
+import org.apache.flink.metrics.testutils.MetricListener;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kinesis.model.Record;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 
+import static 
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplitState;
-import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
 class KinesisStreamsSourceReaderTest {
+    private KinesisStreamsSourceReader<TestData> sourceReader;
+    private MetricListener metricListener;
+    private Map<String, KinesisShardMetrics> shardMetricGroupMap;
+
+    @BeforeEach
+    public void init() {
+        metricListener = new MetricListener();
+        shardMetricGroupMap = new ConcurrentHashMap<>();
+        StreamProxy testStreamProxy = getTestStreamProxy();
+        Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
+                () -> new PollingKinesisShardSplitReader(testStreamProxy, 
shardMetricGroupMap);
+
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+
+        sourceReader =
+                new KinesisStreamsSourceReader<>(
+                        elementsQueue,
+                        new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
+                        new KinesisStreamsRecordEmitter<>(null),
+                        new Configuration(),
+                        
KinesisContextProvider.KinesisTestingContext.getKinesisTestingContext(
+                                metricListener),
+                        shardMetricGroupMap);
+    }
 
     @Test
     void testInitializedState() throws Exception {
-        KinesisStreamsSourceReader<TestData> sourceReader =
-                new KinesisStreamsSourceReader<>(
-                        null, null, null, new Configuration(), new 
TestingReaderContext());
         KinesisShardSplit split = getTestSplit();
         assertThat(sourceReader.initializedState(split))
                 .usingRecursiveComparison()
@@ -48,9 +83,6 @@ class KinesisStreamsSourceReaderTest {
 
     @Test
     void testToSplitType() throws Exception {
-        KinesisStreamsSourceReader<TestData> sourceReader =
-                new KinesisStreamsSourceReader<>(
-                        null, null, null, new Configuration(), new 
TestingReaderContext());
         KinesisShardSplitState splitState = getTestSplitState();
         String splitId = splitState.getSplitId();
         assertThat(sourceReader.toSplitType(splitId, splitState))
@@ -59,11 +91,35 @@ class KinesisStreamsSourceReaderTest {
     }
 
     @Test
-    void testOnSplitFinishedIsNoOp() throws Exception {
-        KinesisStreamsSourceReader<TestData> sourceReader =
-                new KinesisStreamsSourceReader<>(
-                        null, null, null, new Configuration(), new 
TestingReaderContext());
-        assertThatNoException()
-                .isThrownBy(() -> 
sourceReader.onSplitFinished(Collections.emptyMap()));
+    void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception {
+        KinesisShardSplit split = getTestSplit();
+
+        List<KinesisShardSplit> splits = Collections.singletonList(split);
+
+        sourceReader.addSplits(splits);
+        sourceReader.isAvailable().get();
+
+        assertThat(shardMetricGroupMap.get(split.getShardId())).isNotNull();
+
+        sourceReader.onSplitFinished(
+                Collections.singletonMap(split.getShardId(), new 
KinesisShardSplitState(split)));
+
+        assertThat(shardMetricGroupMap.get(split.getShardId())).isNull();
+    }
+
+    @Test
+    void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception {
+        KinesisShardSplit split = getTestSplit();
+
+        List<KinesisShardSplit> splits = Collections.singletonList(split);
+        sourceReader.addSplits(splits);
+
+        // Wait for fetcher tasks to finish to assert after the metric is 
registered and updated.
+        sourceReader.isAvailable().get();
+
+        assertThat(shardMetricGroupMap.get(split.getShardId())).isNotNull();
+
+        TestUtil.assertMillisBehindLatest(
+                split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, 
metricListener);
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
index 1e5c20d..09f27d5 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
@@ -20,34 +20,53 @@ package org.apache.flink.connector.kinesis.source.reader;
 
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import 
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy;
 import org.apache.flink.connector.kinesis.source.util.TestUtil;
+import org.apache.flink.metrics.testutils.MetricListener;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
-import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord;
 import static 
org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
 import static 
org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
 class PollingKinesisShardSplitReaderTest {
+    private PollingKinesisShardSplitReader splitReader;
+    private TestKinesisStreamProxy testStreamProxy;
+    private MetricListener metricListener;
+    private Map<String, KinesisShardMetrics> shardMetricGroupMap;
+    private static final String TEST_SHARD_ID = TestUtil.generateShardId(1);
+
+    @BeforeEach
+    public void init() {
+        testStreamProxy = getTestStreamProxy();
+        metricListener = new MetricListener();
+        shardMetricGroupMap = new ConcurrentHashMap<>();
+
+        shardMetricGroupMap.put(
+                TEST_SHARD_ID,
+                new KinesisShardMetrics(
+                        TestUtil.getTestSplit(TEST_SHARD_ID), 
metricListener.getMetricGroup()));
+        splitReader = new PollingKinesisShardSplitReader(testStreamProxy, 
shardMetricGroupMap);
+    }
+
     @Test
     void testNoAssignedSplitsHandledGracefully() throws Exception {
-        StreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch();
 
         assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -57,15 +76,10 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         // Given assigned split with no records
-        String shardId = generateShardId(1);
-        testStreamProxy.addShards(shardId);
+        testStreamProxy.addShards(TEST_SHARD_ID);
         splitReader.handleSplitsChanges(
-                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(shardId))));
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
 
         // When fetching records
         RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch();
@@ -78,24 +92,25 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testSingleAssignedSplitAllConsumed() throws Exception {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         // Given assigned split with records
-        String shardId = generateShardId(1);
-        testStreamProxy.addShards(shardId);
+        testStreamProxy.addShards(TEST_SHARD_ID);
         List<Record> expectedRecords =
                 Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), 
getTestRecord("data-3"))
                         .collect(Collectors.toList());
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(0)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(0)));
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(1)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(1)));
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(2)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(2)));
         splitReader.handleSplitsChanges(
-                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(shardId))));
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
 
         // When fetching records
         List<Record> records = new ArrayList<>();
@@ -109,24 +124,25 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testMultipleAssignedSplitsAllConsumed() throws Exception {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         // Given assigned split with records
-        String shardId = generateShardId(1);
-        testStreamProxy.addShards(shardId);
+        testStreamProxy.addShards(TEST_SHARD_ID);
         List<Record> expectedRecords =
                 Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), 
getTestRecord("data-3"))
                         .collect(Collectors.toList());
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(0)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(0)));
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(1)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(1)));
         testStreamProxy.addRecords(
-                TestUtil.STREAM_ARN, shardId, 
Collections.singletonList(expectedRecords.get(2)));
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(2)));
         splitReader.handleSplitsChanges(
-                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(shardId))));
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
 
         // When records are fetched
         List<Record> fetchedRecords = new ArrayList<>();
@@ -141,15 +157,10 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testHandleEmptyCompletedShard() throws Exception {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         // Given assigned split with no records, and the shard is complete
-        String shardId = generateShardId(1);
-        testStreamProxy.addShards(shardId);
-        testStreamProxy.addRecords(TestUtil.STREAM_ARN, shardId, 
Collections.emptyList());
-        KinesisShardSplit split = getTestSplit(shardId);
+        testStreamProxy.addShards(TEST_SHARD_ID);
+        testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, 
Collections.emptyList());
+        KinesisShardSplit split = getTestSplit(TEST_SHARD_ID);
         splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
         testStreamProxy.setShouldCompleteNextShard(true);
 
@@ -164,18 +175,13 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testFinishedSplitsReturned() throws Exception {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         // Given assigned split with records from completed shard
-        String shardId = generateShardId(1);
-        testStreamProxy.addShards(shardId);
+        testStreamProxy.addShards(TEST_SHARD_ID);
         List<Record> expectedRecords =
                 Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), 
getTestRecord("data-3"))
                         .collect(Collectors.toList());
-        testStreamProxy.addRecords(TestUtil.STREAM_ARN, shardId, 
expectedRecords);
-        KinesisShardSplit split = getTestSplit(shardId);
+        testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, 
expectedRecords);
+        KinesisShardSplit split = getTestSplit(TEST_SHARD_ID);
         splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
 
         // When fetching records
@@ -197,23 +203,30 @@ class PollingKinesisShardSplitReaderTest {
 
     @Test
     void testWakeUpIsNoOp() {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         assertThatNoException().isThrownBy(splitReader::wakeUp);
     }
 
     @Test
     void testCloseClosesStreamProxy() {
-        TestKinesisStreamProxy testStreamProxy = getTestStreamProxy();
-        PollingKinesisShardSplitReader splitReader =
-                new PollingKinesisShardSplitReader(testStreamProxy);
-
         assertThatNoException().isThrownBy(splitReader::close);
         assertThat(testStreamProxy.isClosed()).isTrue();
     }
 
+    @Test
+    void testFetchUpdatesTheMillisBehindLatestMetric() throws IOException {
+        KinesisShardSplit split = getTestSplit();
+        shardMetricGroupMap.put(
+                split.getShardId(),
+                new KinesisShardMetrics(split, 
metricListener.getMetricGroup()));
+        TestUtil.assertMillisBehindLatest(split, -1L, metricListener);
+
+        splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(split)));
+
+        splitReader.fetch();
+        TestUtil.assertMillisBehindLatest(
+                split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, 
metricListener);
+    }
+
     private List<Record> readAllRecords(RecordsWithSplitIds<Record> 
recordsWithSplitIds) {
         List<Record> outputRecords = new ArrayList<>();
         Record record;
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java
new file mode 100644
index 0000000..c68870c
--- /dev/null
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.util;
+
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+
+/** Provides {@link TestingReaderContext} with mocked Kinesis Stream behavior. 
*/
+public class KinesisContextProvider {
+    /**
+     * An implementation of the {@link TestingReaderContext} that allows 
control over ReaderContext
+     * methods.
+     */
+    public static class KinesisTestingContext extends TestingReaderContext {
+
+        public static KinesisTestingContext getKinesisTestingContext(
+                MetricListener metricListener) {
+            return new KinesisTestingContext(metricListener);
+        }
+
+        private final MetricListener metricListener;
+
+        public KinesisTestingContext(MetricListener metricListener) {
+            this.metricListener = metricListener;
+        }
+
+        @Override
+        public SourceReaderMetricGroup metricGroup() {
+            return 
InternalSourceReaderMetricGroup.mock(this.metricListener.getMetricGroup());
+        }
+    }
+}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
index ff9c439..0651139 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java
@@ -100,6 +100,7 @@ public class KinesisStreamProxyProvider {
             return GetRecordsResponse.builder()
                     .records(records)
                     .nextShardIterator(shouldCompleteNextShard ? null : 
"some-shard-iterator")
+                    
.millisBehindLatest(TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE)
                     .build();
         }
 
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
index c8c9963..eb9394a 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
@@ -20,14 +20,21 @@ package org.apache.flink.connector.kinesis.source.util;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.connector.kinesis.source.metrics.MetricConstants;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
 import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.testutils.MetricListener;
 
+import software.amazon.awssdk.arns.Arn;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 import java.time.Instant;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Utilities class for testing Kinesis Source. */
 public class TestUtil {
@@ -36,6 +43,7 @@ public class TestUtil {
             "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream";
     public static final String SHARD_ID = "shardId-000000000002";
     public static final SimpleStringSchema STRING_SCHEMA = new 
SimpleStringSchema();
+    public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = 100L;
 
     public static String generateShardId(int shardId) {
         return String.format("shardId-%012d", shardId);
@@ -75,4 +83,25 @@ public class TestUtil {
                 .approximateArrivalTimestamp(Instant.now())
                 .build();
     }
+
+    public static void assertMillisBehindLatest(
+            KinesisShardSplit split, long expectedValue, MetricListener 
metricListener) {
+        Arn kinesisArn = Arn.fromString(split.getStreamArn());
+
+        final Optional<Gauge<Long>> millisBehindLatestGauge =
+                metricListener.getGauge(
+                        MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP,
+                        MetricConstants.ACCOUNT_ID_METRIC_GROUP,
+                        kinesisArn.accountId().get(),
+                        MetricConstants.REGION_METRIC_GROUP,
+                        kinesisArn.region().get(),
+                        MetricConstants.STREAM_METRIC_GROUP,
+                        kinesisArn.resource().resource(),
+                        MetricConstants.SHARD_METRIC_GROUP,
+                        split.getShardId(),
+                        MetricConstants.MILLIS_BEHIND_LATEST);
+
+        assertThat(millisBehindLatestGauge).isPresent();
+        assertThat((long) 
millisBehindLatestGauge.get().getValue()).isEqualTo(expectedValue);
+    }
 }


Reply via email to