showuon commented on code in PR #14078:
URL: https://github.com/apache/kafka/pull/14078#discussion_r1300849860


##########
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwksTest.java:
##########
@@ -195,4 +231,68 @@ public String getBody() {
         return Mockito.spy(httpsJwks);
     }
 
+    /**
+     * A mock ScheduledExecutorService just for the test. Note that this is 
not a generally reusable mock as it does not
+     * implement some interfaces like scheduleWithFixedDelay, etc. And it does 
not return ScheduledFuture correctly.
+     */
+    private class MockExecutorService implements MockTime.Listener {
+        private final MockTime time;
+
+        private final TreeMap<Long, List<AbstractMap.SimpleEntry<Long, 
KafkaFutureImpl<Long>>>> waiters = new TreeMap<>();
+
+        public MockExecutorService(MockTime time) {
+            this.time = time;
+            time.addListener(this);
+        }
+
+        @Override
+        public synchronized void onTimeUpdated() {
+            long timeMs = time.milliseconds();
+            while (true) {
+                Map.Entry<Long, List<AbstractMap.SimpleEntry<Long, 
KafkaFutureImpl<Long>>>> entry = waiters.firstEntry();
+                if ((entry == null) || (entry.getKey() > timeMs)) {
+                    break;
+                }
+                for (AbstractMap.SimpleEntry<Long, KafkaFutureImpl<Long>> pair 
: entry.getValue()) {
+                    pair.getValue().complete(timeMs);
+                    if (pair.getKey() != null) {
+                        addWaiter(entry.getKey() + pair.getKey(), 
pair.getKey(), pair.getValue());
+                    }
+                }
+                waiters.remove(entry.getKey());
+            }
+        }
+
+        private synchronized void addWaiter(long delayMs, Long period, 
KafkaFutureImpl<Long> waiter) {
+            long timeMs = time.milliseconds();
+            if (delayMs <= 0) {
+                waiter.complete(timeMs);
+            } else {
+                long triggerTimeMs = timeMs + delayMs;
+                List<AbstractMap.SimpleEntry<Long, KafkaFutureImpl<Long>>> 
futures =
+                        waiters.computeIfAbsent(triggerTimeMs, k -> new 
ArrayList<>());
+                futures.add(new AbstractMap.SimpleEntry<>(period, waiter));
+            }
+        }
+
+        /**
+         * Internal utility function for periodic or one time refreshes.
+         *
+         * @param period null indicates one time refresh, otherwise it is 
periodic.
+         */
+        public <T> ScheduledFuture<T> schedule(final Callable<T> callable, 
long delayMs, Long period) {
+
+            KafkaFutureImpl<Long> waiter = new KafkaFutureImpl<>();
+            waiter.thenApply((KafkaFuture.BaseFunction<Long, Void>) now -> {
+                try {
+                    callable.call();
+                } catch (Throwable ignored) {
+                }

Review Comment:
   OK, but we sometimes do `e.printStackTrace();` for this kind of situation 
like here:
   
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java#L272
   
   It will be helpful when this test someday become flaky. Could we add it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to