This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 87fb442c223 [improve][test] Add solution to PulsarMockBookKeeper for 
intercepting reads (#23875)
87fb442c223 is described below

commit 87fb442c223d47d8a426b44575981345d7a23481
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 22 22:36:40 2025 -0800

    [improve][test] Add solution to PulsarMockBookKeeper for intercepting reads 
(#23875)
---
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  8 ++++-
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  2 +-
 .../bookkeeper/client/PulsarMockReadHandle.java    | 31 +++++++++++------
 .../client/PulsarMockReadHandleInterceptor.java    | 40 ++++++++++++++++++++++
 4 files changed, 68 insertions(+), 13 deletions(-)

diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 1e979206e16..344173c3091 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -40,6 +40,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -96,6 +98,9 @@ public class PulsarMockBookKeeper extends BookKeeper {
     final Queue<Long> addEntryResponseDelaysMillis = new 
ConcurrentLinkedQueue<>();
     final List<CompletableFuture<Void>> failures = new ArrayList<>();
     final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
+    @Setter
+    @Getter
+    private volatile PulsarMockReadHandleInterceptor readHandleInterceptor;
 
     public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws 
Exception {
         this.orderedExecutor = orderedExecutor;
@@ -250,7 +255,8 @@ public class PulsarMockBookKeeper extends BookKeeper {
                                 return FutureUtils.exception(new 
BKException.BKUnauthorizedAccessException());
                             } else {
                                 return FutureUtils.value(new 
PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
-                                                                               
   lh.getLedgerMetadata(), lh.entries));
+                                        lh.getLedgerMetadata(), lh.entries,
+                                        
PulsarMockBookKeeper.this::getReadHandleInterceptor));
                             }
                         });
             }
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index aa61e541d0d..d30684e6046 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -73,7 +73,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
         this.digest = digest;
         this.passwd = Arrays.copyOf(passwd, passwd.length);
 
-        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), 
entries);
+        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), 
entries, bk::getReadHandleInterceptor);
     }
 
     @Override
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index a4361f62254..9f3f4969199 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle {
     private final long ledgerId;
     private final LedgerMetadata metadata;
     private final List<LedgerEntryImpl> entries;
+    private final Supplier<PulsarMockReadHandleInterceptor> 
readHandleInterceptorSupplier;
 
     PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, 
LedgerMetadata metadata,
-                         List<LedgerEntryImpl> entries) {
+                         List<LedgerEntryImpl> entries,
+                         Supplier<PulsarMockReadHandleInterceptor> 
readHandleInterceptorSupplier) {
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.metadata = metadata;
         this.entries = entries;
+        this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
     }
 
     @Override
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
         return bk.getProgrammedFailure().thenComposeAsync((res) -> {
-                log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
-                List<LedgerEntry> seq = new ArrayList<>();
-                long entryId = firstEntry;
-                while (entryId <= lastEntry && entryId < entries.size()) {
-                    seq.add(entries.get((int) entryId++).duplicate());
-                }
-                log.debug("Entries read: {}", seq);
-
-                return FutureUtils.value(LedgerEntriesImpl.create(seq));
-            });
+            log.debug("readEntries: first={} last={} total={}", firstEntry, 
lastEntry, entries.size());
+            List<LedgerEntry> seq = new ArrayList<>();
+            long entryId = firstEntry;
+            while (entryId <= lastEntry && entryId < entries.size()) {
+                seq.add(entries.get((int) entryId++).duplicate());
+            }
+            log.debug("Entries read: {}", seq);
+            LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq);
+            PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = 
readHandleInterceptorSupplier.get();
+            if (pulsarMockReadHandleInterceptor != null) {
+                return 
pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, 
lastEntry,
+                        ledgerEntries);
+            }
+            return FutureUtils.value(ledgerEntries);
+        });
     }
 
     @Override
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
new file mode 100644
index 00000000000..acee87b0f77
--- /dev/null
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+
+/**
+ * Interceptor interface for intercepting read handle readAsync operations.
+ * This is useful for testing purposes, for example for introducing delays.
+ */
+public interface PulsarMockReadHandleInterceptor {
+    /**
+     * Intercepts the readAsync operation on a read handle.
+     *
+     * @param ledgerId ledger id
+     * @param firstEntry first entry to read
+     * @param lastEntry  last entry to read
+     * @param entries    entries that would be returned by the read operation
+     * @return CompletableFuture that will complete with the entries to return
+     */
+    CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long 
firstEntry, long lastEntry,
+                                                        LedgerEntries entries);
+}

Reply via email to