JingGe commented on a change in pull request #17773:
URL: https://github.com/apache/flink/pull/17773#discussion_r749161803
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -175,30 +184,33 @@ public void testPendingRecordsGauge() throws Exception {
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
- new KafkaPartitionSplit(new
TopicPartition(TOPIC1, 0), 0L))));
+ new KafkaPartitionSplit(new
TopicPartition(topic1Name, 0), 0L))));
// pendingRecords should have not been registered because of lazily
registration
-
assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+
Assertions.assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
// Trigger first fetch
reader.fetch();
final Optional<Gauge<Long>> pendingRecords =
metricListener.getGauge(MetricNames.PENDING_RECORDS);
- assertTrue(pendingRecords.isPresent());
+ Assertions.assertTrue(pendingRecords.isPresent());
// Validate pendingRecords
- assertNotNull(pendingRecords);
- assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long)
pendingRecords.get().getValue());
+ Assertions.assertNotNull(pendingRecords);
+ Assertions.assertEquals(
+ NUM_RECORDS_PER_PARTITION - 1, (long)
pendingRecords.get().getValue());
for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
reader.fetch();
- assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long)
pendingRecords.get().getValue());
+ Assertions.assertEquals(
+ NUM_RECORDS_PER_PARTITION - i - 1, (long)
pendingRecords.get().getValue());
}
// Add another split
reader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
- new KafkaPartitionSplit(new
TopicPartition(TOPIC2, 0), 0L))));
+ new KafkaPartitionSplit(new
TopicPartition(topic2Name, 0), 0L))));
// Validate pendingRecords
for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
reader.fetch();
- assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long)
pendingRecords.get().getValue());
+ Assertions.assertEquals(
+ NUM_RECORDS_PER_PARTITION - i - 1, (long)
pendingRecords.get().getValue());
Review comment:
Nit: all the assert changes could be avoided by using static import.
This will help reducing the review effort and minimise any unexpected human
errors, e.g. typo.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/metrics/KafkaSourceReaderMetrics.java
##########
@@ -297,9 +299,9 @@ private void checkTopicPartitionTracked(TopicPartition tp) {
return
metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP)
&& metricName.name().equals(RECORDS_LAG)
&& tags.containsKey("topic")
- && tags.get("topic").equals(tp.topic())
+ && tags.get("topic").equals(resolvedTopic)
&& tags.containsKey("partition")
- &&
tags.get("partition").equals(String.valueOf(tp.partition()));
+ &&
tags.get("partition").equals(resolvedPartition);
Review comment:
May I know the reason of introducing local variables instead of change
it directly?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -162,8 +161,18 @@ public void testNumBytesInCounter() throws Exception {
assertThat(numBytesInCounter.getCount(),
Matchers.greaterThan(latestNumBytesIn));
}
- @Test
- public void testPendingRecordsGauge() throws Exception {
+ @ParameterizedTest
+ @EmptySource
+ @ValueSource(strings = {"_underscore.period-minus"})
+ public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
+ final String topic1Name = TOPIC1 + topicSuffix;
+ final String topic2Name = TOPIC2 + topicSuffix;
+ if (!topicSuffix.isEmpty()) {
+ KafkaSourceTestEnv.setupTopic(
+ topic1Name, true, true,
KafkaSourceTestEnv::getRecordsForTopic);
+ KafkaSourceTestEnv.setupTopic(
+ topic2Name, true, true,
KafkaSourceTestEnv::getRecordsForTopic);
+ }
Review comment:
Does it mean that the rest of the test works when `topicSuffix.isEmpty()
== true` for `@EmptySource` and no topic has been created?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]