This is an automated email from the ASF dual-hosted git repository. hong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push: new 9e418f7 [FLINK-32229][Connector/Kinesis] Add millisBehindLatest metric to Kinesis source implementation 9e418f7 is described below commit 9e418f7c447faf36517b69f1199e3f46e27c9ad2 Author: Burak Ozakinci <bura...@amazon.co.uk> AuthorDate: Fri Jun 14 22:42:26 2024 +0100 [FLINK-32229][Connector/Kinesis] Add millisBehindLatest metric to Kinesis source implementation --- .../kinesis/source/KinesisStreamsSource.java | 13 ++- .../source/metrics/KinesisShardMetrics.java | 81 +++++++++++++ .../kinesis/source/metrics/MetricConstants.java | 34 ++++++ .../source/reader/KinesisStreamsSourceReader.java | 47 +++++++- .../reader/PollingKinesisShardSplitReader.java | 11 +- .../source/metrics/KinesisShardMetricsTest.java | 95 +++++++++++++++ .../reader/KinesisStreamsSourceReaderTest.java | 84 +++++++++++--- .../reader/PollingKinesisShardSplitReaderTest.java | 127 ++++++++++++--------- .../source/util/KinesisContextProvider.java | 50 ++++++++ .../source/util/KinesisStreamProxyProvider.java | 1 + .../connector/kinesis/source/util/TestUtil.java | 29 +++++ 11 files changed, 496 insertions(+), 76 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 4ed24c0..c3455ee 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -37,6 +37,7 @@ import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader; @@ -55,7 +56,9 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.utils.AttributeMap; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; /** @@ -124,10 +127,15 @@ public class KinesisStreamsSource<T> FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> elementsQueue = new FutureCompletingBlockingQueue<>(); + + Map<String, KinesisShardMetrics> shardMetricGroupMap = new ConcurrentHashMap<>(); + // We create a new stream proxy for each split reader since they have their own independent // lifecycle. Supplier<PollingKinesisShardSplitReader> splitReaderSupplier = - () -> new PollingKinesisShardSplitReader(createKinesisStreamProxy(sourceConfig)); + () -> + new PollingKinesisShardSplitReader( + createKinesisStreamProxy(sourceConfig), shardMetricGroupMap); KinesisStreamsRecordEmitter<T> recordEmitter = new KinesisStreamsRecordEmitter<>(deserializationSchema); @@ -136,7 +144,8 @@ public class KinesisStreamsSource<T> new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get), recordEmitter, sourceConfig, - readerContext); + readerContext, + shardMetricGroupMap); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java new file mode 100644 index 0000000..c46d539 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetrics.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; + +/** A utility class for handling Kinesis shard metrics. */ +@Internal +public class KinesisShardMetrics { + private static final Logger log = LoggerFactory.getLogger(KinesisShardMetrics.class); + private final MetricGroup metricGroup; + private final KinesisShardSplit shardInfo; + private volatile long millisBehindLatest = -1L; + + public KinesisShardMetrics(KinesisShardSplit shard, MetricGroup rootMetricGroup) { + this.shardInfo = shard; + + Arn streamArn = Arn.fromString(shard.getStreamArn()); + this.metricGroup = + rootMetricGroup + .addGroup(MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP) + .addGroup( + MetricConstants.ACCOUNT_ID_METRIC_GROUP, + streamArn.accountId().get()) + .addGroup(MetricConstants.REGION_METRIC_GROUP, streamArn.region().get()) + .addGroup( + MetricConstants.STREAM_METRIC_GROUP, + streamArn.resource().resource()) + .addGroup(MetricConstants.SHARD_METRIC_GROUP, shard.getShardId()); + + this.metricGroup.gauge(MetricConstants.MILLIS_BEHIND_LATEST, this::getMillisBehindLatest); + + log.debug( + "Registered metric with identifier: {}", + metricGroup.getMetricIdentifier(MetricConstants.MILLIS_BEHIND_LATEST)); + } + + public MetricGroup getMetricGroup() { + return metricGroup; + } + + public long getMillisBehindLatest() { + return millisBehindLatest; + } + + public void setMillisBehindLatest(long millisBehindLatest) { + log.debug( + "Updating millisBehindLatest metric for shard {} to {}", + shardInfo.getShardId(), + millisBehindLatest); + + this.millisBehindLatest = millisBehindLatest; + } + + public void unregister() { + ((AbstractMetricGroup) metricGroup).close(); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java new file mode 100644 index 0000000..683946e --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/metrics/MetricConstants.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.metrics; + +import org.apache.flink.annotation.Internal; + +/** A collection of consumer metric related constant names. */ +@Internal +public class MetricConstants { + + public static final String KINESIS_STREAM_SOURCE_METRIC_GROUP = "KinesisStreamSource"; + public static final String STREAM_METRIC_GROUP = "stream"; + public static final String SHARD_METRIC_GROUP = "shardId"; + public static final String REGION_METRIC_GROUP = "region"; + public static final String ACCOUNT_ID_METRIC_GROUP = "accountId"; + + public static final String MILLIS_BEHIND_LATEST = "millisBehindLatest"; +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index 30bf2cf..0eb7c1a 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -26,11 +26,15 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.Record; +import java.util.List; import java.util.Map; /** @@ -43,18 +47,23 @@ public class KinesisStreamsSourceReader<T> extends SingleThreadMultiplexSourceReaderBase< Record, T, KinesisShardSplit, KinesisShardSplitState> { + private static final Logger log = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); + private final Map<String, KinesisShardMetrics> shardMetricGroupMap; + public KinesisStreamsSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> elementsQueue, SingleThreadFetcherManager<Record, KinesisShardSplit> splitFetcherManager, RecordEmitter<Record, T, KinesisShardSplitState> recordEmitter, Configuration config, - SourceReaderContext context) { + SourceReaderContext context, + Map<String, KinesisShardMetrics> shardMetricGroupMap) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); + this.shardMetricGroupMap = shardMetricGroupMap; } @Override protected void onSplitFinished(Map<String, KinesisShardSplitState> finishedSplitIds) { - // no-op. We don't need to do anything on finished split now + finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup); } @Override @@ -66,4 +75,38 @@ public class KinesisStreamsSourceReader<T> protected KinesisShardSplit toSplitType(String splitId, KinesisShardSplitState splitState) { return splitState.getKinesisShardSplit(); } + + @Override + public void addSplits(List<KinesisShardSplit> splits) { + splits.forEach(this::registerShardMetricGroup); + super.addSplits(splits); + } + + @Override + public void close() throws Exception { + super.close(); + this.shardMetricGroupMap.keySet().forEach(this::unregisterShardMetricGroup); + } + + private void registerShardMetricGroup(KinesisShardSplit split) { + if (!this.shardMetricGroupMap.containsKey(split.getShardId())) { + this.shardMetricGroupMap.put( + split.getShardId(), new KinesisShardMetrics(split, context.metricGroup())); + } else { + log.warn( + "Metric group for shard with id {} has already been registered.", + split.getShardId()); + } + } + + private void unregisterShardMetricGroup(String shardId) { + KinesisShardMetrics removed = this.shardMetricGroupMap.remove(shardId); + if (removed != null) { + removed.unregister(); + } else { + log.warn( + "Shard metric group unregister failed. Metric group for {} does not exist.", + shardId); + } + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java index 354dc26..cdf2253 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; @@ -37,6 +38,7 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.Iterator; +import java.util.Map; import java.util.Set; import static java.util.Collections.singleton; @@ -53,9 +55,12 @@ public class PollingKinesisShardSplitReader implements SplitReader<Record, Kines private final StreamProxy kinesis; private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque<>(); + private final Map<String, KinesisShardMetrics> shardMetricGroupMap; - public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) { + public PollingKinesisShardSplitReader( + StreamProxy kinesisProxy, Map<String, KinesisShardMetrics> shardMetricGroupMap) { this.kinesis = kinesisProxy; + this.shardMetricGroupMap = shardMetricGroupMap; } @Override @@ -72,6 +77,10 @@ public class PollingKinesisShardSplitReader implements SplitReader<Record, Kines splitState.getNextStartingPosition()); boolean isComplete = getRecordsResponse.nextShardIterator() == null; + shardMetricGroupMap + .get(splitState.getShardId()) + .setMillisBehindLatest(getRecordsResponse.millisBehindLatest()); + if (hasNoRecords(getRecordsResponse)) { if (isComplete) { return new KinesisRecordsWithSplitIds( diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java new file mode 100644 index 0000000..4563816 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/metrics/KinesisShardMetricsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.metrics; + +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.util.TestUtil; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.arns.Arn; + +import static org.assertj.core.api.Assertions.assertThat; + +class KinesisShardMetricsTest { + private static final KinesisShardSplit TEST_SPLIT = + TestUtil.getTestSplit(TestUtil.STREAM_ARN, TestUtil.generateShardId(1)); + + private MetricListener metricListener; + private KinesisShardMetrics kinesisShardMetrics; + + @BeforeEach + public void init() { + metricListener = new MetricListener(); + kinesisShardMetrics = + new KinesisShardMetrics( + TEST_SPLIT, + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + } + + @Test + void testRegisterShardMetricGroup() { + TestUtil.assertMillisBehindLatest(TEST_SPLIT, -1L, metricListener); + } + + @Test + void testKinesisMetricIdentifier() { + Arn streamArn = Arn.fromString(TEST_SPLIT.getStreamArn()); + + String expectedIdentifier = + String.join( + ".", + MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP, + MetricConstants.ACCOUNT_ID_METRIC_GROUP, + streamArn.accountId().get(), + MetricConstants.REGION_METRIC_GROUP, + streamArn.region().get(), + MetricConstants.STREAM_METRIC_GROUP, + streamArn.resource().resource(), + MetricConstants.SHARD_METRIC_GROUP, + TEST_SPLIT.getShardId(), + MetricConstants.MILLIS_BEHIND_LATEST); + + assertThat(metricListener.getGauge(expectedIdentifier).isPresent()).isTrue(); + } + + @Test + void testUpdateShardMillisBehindLatest() { + kinesisShardMetrics.setMillisBehindLatest(100L); + TestUtil.assertMillisBehindLatest(TEST_SPLIT, 100L, metricListener); + + kinesisShardMetrics.setMillisBehindLatest(10000L); + TestUtil.assertMillisBehindLatest(TEST_SPLIT, 10000L, metricListener); + + kinesisShardMetrics.setMillisBehindLatest(1000000L); + TestUtil.assertMillisBehindLatest(TEST_SPLIT, 1000000L, metricListener); + } + + @Test + void testUnregisterShardMetricGroup() { + assertThat(((AbstractMetricGroup) kinesisShardMetrics.getMetricGroup()).isClosed()) + .isFalse(); + kinesisShardMetrics.unregister(); + assertThat(((AbstractMetricGroup) kinesisShardMetrics.getMetricGroup()).isClosed()) + .isTrue(); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java index 7e5381c..55bd2be 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReaderTest.java @@ -19,27 +19,62 @@ package org.apache.flink.connector.kinesis.source.reader; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.model.TestData; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; -import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.kinesis.source.util.KinesisContextProvider; +import org.apache.flink.connector.kinesis.source.util.TestUtil; +import org.apache.flink.metrics.testutils.MetricListener; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.Record; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplitState; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; class KinesisStreamsSourceReaderTest { + private KinesisStreamsSourceReader<TestData> sourceReader; + private MetricListener metricListener; + private Map<String, KinesisShardMetrics> shardMetricGroupMap; + + @BeforeEach + public void init() { + metricListener = new MetricListener(); + shardMetricGroupMap = new ConcurrentHashMap<>(); + StreamProxy testStreamProxy = getTestStreamProxy(); + Supplier<PollingKinesisShardSplitReader> splitReaderSupplier = + () -> new PollingKinesisShardSplitReader(testStreamProxy, shardMetricGroupMap); + + FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + + sourceReader = + new KinesisStreamsSourceReader<>( + elementsQueue, + new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get), + new KinesisStreamsRecordEmitter<>(null), + new Configuration(), + KinesisContextProvider.KinesisTestingContext.getKinesisTestingContext( + metricListener), + shardMetricGroupMap); + } @Test void testInitializedState() throws Exception { - KinesisStreamsSourceReader<TestData> sourceReader = - new KinesisStreamsSourceReader<>( - null, null, null, new Configuration(), new TestingReaderContext()); KinesisShardSplit split = getTestSplit(); assertThat(sourceReader.initializedState(split)) .usingRecursiveComparison() @@ -48,9 +83,6 @@ class KinesisStreamsSourceReaderTest { @Test void testToSplitType() throws Exception { - KinesisStreamsSourceReader<TestData> sourceReader = - new KinesisStreamsSourceReader<>( - null, null, null, new Configuration(), new TestingReaderContext()); KinesisShardSplitState splitState = getTestSplitState(); String splitId = splitState.getSplitId(); assertThat(sourceReader.toSplitType(splitId, splitState)) @@ -59,11 +91,35 @@ class KinesisStreamsSourceReaderTest { } @Test - void testOnSplitFinishedIsNoOp() throws Exception { - KinesisStreamsSourceReader<TestData> sourceReader = - new KinesisStreamsSourceReader<>( - null, null, null, new Configuration(), new TestingReaderContext()); - assertThatNoException() - .isThrownBy(() -> sourceReader.onSplitFinished(Collections.emptyMap())); + void testOnSplitFinishedShardMetricGroupUnregistered() throws Exception { + KinesisShardSplit split = getTestSplit(); + + List<KinesisShardSplit> splits = Collections.singletonList(split); + + sourceReader.addSplits(splits); + sourceReader.isAvailable().get(); + + assertThat(shardMetricGroupMap.get(split.getShardId())).isNotNull(); + + sourceReader.onSplitFinished( + Collections.singletonMap(split.getShardId(), new KinesisShardSplitState(split))); + + assertThat(shardMetricGroupMap.get(split.getShardId())).isNull(); + } + + @Test + void testAddSplitsRegistersAndUpdatesShardMetricGroup() throws Exception { + KinesisShardSplit split = getTestSplit(); + + List<KinesisShardSplit> splits = Collections.singletonList(split); + sourceReader.addSplits(splits); + + // Wait for fetcher tasks to finish to assert after the metric is registered and updated. + sourceReader.isAvailable().get(); + + assertThat(shardMetricGroupMap.get(split.getShardId())).isNotNull(); + + TestUtil.assertMillisBehindLatest( + split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, metricListener); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index 1e5c20d..09f27d5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -20,34 +20,53 @@ package org.apache.flink.connector.kinesis.source.reader; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; -import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy; import org.apache.flink.connector.kinesis.source.util.TestUtil; +import org.apache.flink.metrics.testutils.MetricListener; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.Record; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; -import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; class PollingKinesisShardSplitReaderTest { + private PollingKinesisShardSplitReader splitReader; + private TestKinesisStreamProxy testStreamProxy; + private MetricListener metricListener; + private Map<String, KinesisShardMetrics> shardMetricGroupMap; + private static final String TEST_SHARD_ID = TestUtil.generateShardId(1); + + @BeforeEach + public void init() { + testStreamProxy = getTestStreamProxy(); + metricListener = new MetricListener(); + shardMetricGroupMap = new ConcurrentHashMap<>(); + + shardMetricGroupMap.put( + TEST_SHARD_ID, + new KinesisShardMetrics( + TestUtil.getTestSplit(TEST_SHARD_ID), metricListener.getMetricGroup())); + splitReader = new PollingKinesisShardSplitReader(testStreamProxy, shardMetricGroupMap); + } + @Test void testNoAssignedSplitsHandledGracefully() throws Exception { - StreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch(); assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); @@ -57,15 +76,10 @@ class PollingKinesisShardSplitReaderTest { @Test void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - // Given assigned split with no records - String shardId = generateShardId(1); - testStreamProxy.addShards(shardId); + testStreamProxy.addShards(TEST_SHARD_ID); splitReader.handleSplitsChanges( - new SplitsAddition<>(Collections.singletonList(getTestSplit(shardId)))); + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When fetching records RecordsWithSplitIds<Record> retrievedRecords = splitReader.fetch(); @@ -78,24 +92,25 @@ class PollingKinesisShardSplitReaderTest { @Test void testSingleAssignedSplitAllConsumed() throws Exception { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - // Given assigned split with records - String shardId = generateShardId(1); - testStreamProxy.addShards(shardId); + testStreamProxy.addShards(TEST_SHARD_ID); List<Record> expectedRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(0))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(0))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(1))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(1))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(2))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(2))); splitReader.handleSplitsChanges( - new SplitsAddition<>(Collections.singletonList(getTestSplit(shardId)))); + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When fetching records List<Record> records = new ArrayList<>(); @@ -109,24 +124,25 @@ class PollingKinesisShardSplitReaderTest { @Test void testMultipleAssignedSplitsAllConsumed() throws Exception { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - // Given assigned split with records - String shardId = generateShardId(1); - testStreamProxy.addShards(shardId); + testStreamProxy.addShards(TEST_SHARD_ID); List<Record> expectedRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(0))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(0))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(1))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(1))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, shardId, Collections.singletonList(expectedRecords.get(2))); + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(expectedRecords.get(2))); splitReader.handleSplitsChanges( - new SplitsAddition<>(Collections.singletonList(getTestSplit(shardId)))); + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When records are fetched List<Record> fetchedRecords = new ArrayList<>(); @@ -141,15 +157,10 @@ class PollingKinesisShardSplitReaderTest { @Test void testHandleEmptyCompletedShard() throws Exception { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - // Given assigned split with no records, and the shard is complete - String shardId = generateShardId(1); - testStreamProxy.addShards(shardId); - testStreamProxy.addRecords(TestUtil.STREAM_ARN, shardId, Collections.emptyList()); - KinesisShardSplit split = getTestSplit(shardId); + testStreamProxy.addShards(TEST_SHARD_ID); + testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.emptyList()); + KinesisShardSplit split = getTestSplit(TEST_SHARD_ID); splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(split))); testStreamProxy.setShouldCompleteNextShard(true); @@ -164,18 +175,13 @@ class PollingKinesisShardSplitReaderTest { @Test void testFinishedSplitsReturned() throws Exception { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - // Given assigned split with records from completed shard - String shardId = generateShardId(1); - testStreamProxy.addShards(shardId); + testStreamProxy.addShards(TEST_SHARD_ID); List<Record> expectedRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); - testStreamProxy.addRecords(TestUtil.STREAM_ARN, shardId, expectedRecords); - KinesisShardSplit split = getTestSplit(shardId); + testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, expectedRecords); + KinesisShardSplit split = getTestSplit(TEST_SHARD_ID); splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(split))); // When fetching records @@ -197,23 +203,30 @@ class PollingKinesisShardSplitReaderTest { @Test void testWakeUpIsNoOp() { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - assertThatNoException().isThrownBy(splitReader::wakeUp); } @Test void testCloseClosesStreamProxy() { - TestKinesisStreamProxy testStreamProxy = getTestStreamProxy(); - PollingKinesisShardSplitReader splitReader = - new PollingKinesisShardSplitReader(testStreamProxy); - assertThatNoException().isThrownBy(splitReader::close); assertThat(testStreamProxy.isClosed()).isTrue(); } + @Test + void testFetchUpdatesTheMillisBehindLatestMetric() throws IOException { + KinesisShardSplit split = getTestSplit(); + shardMetricGroupMap.put( + split.getShardId(), + new KinesisShardMetrics(split, metricListener.getMetricGroup())); + TestUtil.assertMillisBehindLatest(split, -1L, metricListener); + + splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(split))); + + splitReader.fetch(); + TestUtil.assertMillisBehindLatest( + split, TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE, metricListener); + } + private List<Record> readAllRecords(RecordsWithSplitIds<Record> recordsWithSplitIds) { List<Record> outputRecords = new ArrayList<>(); Record record; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java new file mode 100644 index 0000000..c68870c --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisContextProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.util; + +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; + +/** Provides {@link TestingReaderContext} with mocked Kinesis Stream behavior. */ +public class KinesisContextProvider { + /** + * An implementation of the {@link TestingReaderContext} that allows control over ReaderContext + * methods. + */ + public static class KinesisTestingContext extends TestingReaderContext { + + public static KinesisTestingContext getKinesisTestingContext( + MetricListener metricListener) { + return new KinesisTestingContext(metricListener); + } + + private final MetricListener metricListener; + + public KinesisTestingContext(MetricListener metricListener) { + this.metricListener = metricListener; + } + + @Override + public SourceReaderMetricGroup metricGroup() { + return InternalSourceReaderMetricGroup.mock(this.metricListener.getMetricGroup()); + } + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java index ff9c439..0651139 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java @@ -100,6 +100,7 @@ public class KinesisStreamProxyProvider { return GetRecordsResponse.builder() .records(records) .nextShardIterator(shouldCompleteNextShard ? null : "some-shard-iterator") + .millisBehindLatest(TestUtil.MILLIS_BEHIND_LATEST_TEST_VALUE) .build(); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index c8c9963..eb9394a 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -20,14 +20,21 @@ package org.apache.flink.connector.kinesis.source.util; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.connector.kinesis.source.metrics.MetricConstants; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.testutils.MetricListener; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.Record; import java.time.Instant; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; /** Utilities class for testing Kinesis Source. */ public class TestUtil { @@ -36,6 +43,7 @@ public class TestUtil { "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream"; public static final String SHARD_ID = "shardId-000000000002"; public static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); + public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = 100L; public static String generateShardId(int shardId) { return String.format("shardId-%012d", shardId); @@ -75,4 +83,25 @@ public class TestUtil { .approximateArrivalTimestamp(Instant.now()) .build(); } + + public static void assertMillisBehindLatest( + KinesisShardSplit split, long expectedValue, MetricListener metricListener) { + Arn kinesisArn = Arn.fromString(split.getStreamArn()); + + final Optional<Gauge<Long>> millisBehindLatestGauge = + metricListener.getGauge( + MetricConstants.KINESIS_STREAM_SOURCE_METRIC_GROUP, + MetricConstants.ACCOUNT_ID_METRIC_GROUP, + kinesisArn.accountId().get(), + MetricConstants.REGION_METRIC_GROUP, + kinesisArn.region().get(), + MetricConstants.STREAM_METRIC_GROUP, + kinesisArn.resource().resource(), + MetricConstants.SHARD_METRIC_GROUP, + split.getShardId(), + MetricConstants.MILLIS_BEHIND_LATEST); + + assertThat(millisBehindLatestGauge).isPresent(); + assertThat((long) millisBehindLatestGauge.get().getValue()).isEqualTo(expectedValue); + } }