This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4537c8af5b2 KAFKA-17235 system test test_performance_service.py failed 
(#16789)
4537c8af5b2 is described below

commit 4537c8af5b26751445546a6e63211922333ca41d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Aug 6 14:51:28 2024 +0800

    KAFKA-17235 system test test_performance_service.py failed (#16789)
    
    related to https://issues.apache.org/jira/browse/KAFKA-17235
    
    The root cause of this issue is a change we introduced in KAFKA-16879, 
where we modified the PushHttpMetricsReporter constructor to use Time.System 
[1]. However, Time.System doesn't exist in Kafka versions 0.8.2 and 0.9.
    
    In test_performance_services.py, we have system tests for Kafka versions 
0.8.2 and 0.9 [2]. These tests always use the tools JAR from the trunk branch, 
regardless of the Kafka version being tested [3], while the client JAR aligns 
with the Kafka version specified in the test suite [4]. This discrepancy is 
what causes the issue to arise.
    
    To resolve this issue, we have a few options:
    
    1) Add Time.System to Kafka 0.8.2 and 0.9: This isn't practical, as we no 
longer maintain these versions.
    2) Modify the PushHttpMetricsReporter constructor to use new SystemTime() 
instead of Time.System: This would contradict the intent of KAFKA-16879, which 
aims to make SystemTime a singleton.
    3) Implement Time in PushHttpMetricsReporter use the time to get current 
time
    4) Remove system tests for Kafka 0.8.2 and 0.9 from 
test_performance_services.py
    
    Given that we no longer maintain Kafka 0.8.2 and 0.9, and altering the 
constructor goes against the design goals of KAFKA-16879, option 4 appears to 
be the most feasible solution. However, I'm not sure whether it's acceptable to 
remove these old version tests. Maybe someone else has a better solution
    
    "We'll proceed with option 3 since support for versions 0.8 and 0.9 is 
still required, meaning we can't remove those Kafka versions from the system 
tests."
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/tools/PushHttpMetricsReporter.java    | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java 
b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index d031baac2b8..c13117ba29f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -50,6 +50,7 @@ import java.util.Scanner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /**
  * MetricsReporter that aggregates metrics data and reports it via HTTP 
requests to a configurable
@@ -72,7 +73,7 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
     }
 
     private final Object lock = new Object();
-    private final Time time;
+    private final Supplier<Long> currentTimeMillis;
     private final ScheduledExecutorService executor;
     // The set of metrics are updated in init/metricChange/metricRemoval
     private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
@@ -96,12 +97,17 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
                             "producer/consumer/streams/connect instance");
 
     public PushHttpMetricsReporter() {
-        time = Time.SYSTEM;
+        // In test_performance_services.py, we have system tests for Kafka 
versions 0.8.2 and 0.9.
+        // These tests always use the tools jar from the trunk branch, 
regardless of the Kafka version being tested,
+        // while the client jar aligns with the Kafka version specified in the 
test suite. To ensure these system test
+        // passed, we need to make this class compatible with older client 
jars. This discrepancy force us not to use
+        // `Time.SYSTEM` here as there is no such field in the older Kafka 
version.
+        currentTimeMillis = System::currentTimeMillis;
         executor = Executors.newSingleThreadScheduledExecutor();
     }
 
     PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService 
mockExecutor) {
-        time = mockTime;
+        currentTimeMillis = mockTime::milliseconds;
         executor = mockExecutor;
     }
 
@@ -169,7 +175,7 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
     private class HttpReporter implements Runnable {
         @Override
         public void run() {
-            long now = time.milliseconds();
+            long now = currentTimeMillis.get();
             final List<MetricValue> samples;
             synchronized (lock) {
                 samples = new ArrayList<>(metrics.size());

Reply via email to