This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 4537c8af5b2 KAFKA-17235 system test test_performance_service.py failed
(#16789)
4537c8af5b2 is described below
commit 4537c8af5b26751445546a6e63211922333ca41d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Aug 6 14:51:28 2024 +0800
KAFKA-17235 system test test_performance_service.py failed (#16789)
related to https://issues.apache.org/jira/browse/KAFKA-17235
The root cause of this issue is a change we introduced in KAFKA-16879,
where we modified the PushHttpMetricsReporter constructor to use Time.System
[1]. However, Time.System doesn't exist in Kafka versions 0.8.2 and 0.9.
In test_performance_services.py, we have system tests for Kafka versions
0.8.2 and 0.9 [2]. These tests always use the tools JAR from the trunk branch,
regardless of the Kafka version being tested [3], while the client JAR aligns
with the Kafka version specified in the test suite [4]. This discrepancy is
what causes the issue to arise.
To resolve this issue, we have a few options:
1) Add Time.System to Kafka 0.8.2 and 0.9: This isn't practical, as we no
longer maintain these versions.
2) Modify the PushHttpMetricsReporter constructor to use new SystemTime()
instead of Time.System: This would contradict the intent of KAFKA-16879, which
aims to make SystemTime a singleton.
3) Implement Time in PushHttpMetricsReporter use the time to get current
time
4) Remove system tests for Kafka 0.8.2 and 0.9 from
test_performance_services.py
Given that we no longer maintain Kafka 0.8.2 and 0.9, and altering the
constructor goes against the design goals of KAFKA-16879, option 4 appears to
be the most feasible solution. However, I'm not sure whether it's acceptable to
remove these old version tests. Maybe someone else has a better solution
"We'll proceed with option 3 since support for versions 0.8 and 0.9 is
still required, meaning we can't remove those Kafka versions from the system
tests."
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/tools/PushHttpMetricsReporter.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index d031baac2b8..c13117ba29f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -50,6 +50,7 @@ import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
/**
* MetricsReporter that aggregates metrics data and reports it via HTTP
requests to a configurable
@@ -72,7 +73,7 @@ public class PushHttpMetricsReporter implements
MetricsReporter {
}
private final Object lock = new Object();
- private final Time time;
+ private final Supplier<Long> currentTimeMillis;
private final ScheduledExecutorService executor;
// The set of metrics are updated in init/metricChange/metricRemoval
private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
@@ -96,12 +97,17 @@ public class PushHttpMetricsReporter implements
MetricsReporter {
"producer/consumer/streams/connect instance");
public PushHttpMetricsReporter() {
- time = Time.SYSTEM;
+ // In test_performance_services.py, we have system tests for Kafka
versions 0.8.2 and 0.9.
+ // These tests always use the tools jar from the trunk branch,
regardless of the Kafka version being tested,
+ // while the client jar aligns with the Kafka version specified in the
test suite. To ensure these system test
+ // passed, we need to make this class compatible with older client
jars. This discrepancy force us not to use
+ // `Time.SYSTEM` here as there is no such field in the older Kafka
version.
+ currentTimeMillis = System::currentTimeMillis;
executor = Executors.newSingleThreadScheduledExecutor();
}
PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService
mockExecutor) {
- time = mockTime;
+ currentTimeMillis = mockTime::milliseconds;
executor = mockExecutor;
}
@@ -169,7 +175,7 @@ public class PushHttpMetricsReporter implements
MetricsReporter {
private class HttpReporter implements Runnable {
@Override
public void run() {
- long now = time.milliseconds();
+ long now = currentTimeMillis.get();
final List<MetricValue> samples;
synchronized (lock) {
samples = new ArrayList<>(metrics.size());