This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 87fb442c223 [improve][test] Add solution to PulsarMockBookKeeper for
intercepting reads (#23875)
87fb442c223 is described below
commit 87fb442c223d47d8a426b44575981345d7a23481
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 22 22:36:40 2025 -0800
[improve][test] Add solution to PulsarMockBookKeeper for intercepting reads
(#23875)
---
.../bookkeeper/client/PulsarMockBookKeeper.java | 8 ++++-
.../bookkeeper/client/PulsarMockLedgerHandle.java | 2 +-
.../bookkeeper/client/PulsarMockReadHandle.java | 31 +++++++++++------
.../client/PulsarMockReadHandleInterceptor.java | 40 ++++++++++++++++++++++
4 files changed, 68 insertions(+), 13 deletions(-)
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 1e979206e16..344173c3091 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -40,6 +40,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -96,6 +98,9 @@ public class PulsarMockBookKeeper extends BookKeeper {
final Queue<Long> addEntryResponseDelaysMillis = new
ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();
+ @Setter
+ @Getter
+ private volatile PulsarMockReadHandleInterceptor readHandleInterceptor;
public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws
Exception {
this.orderedExecutor = orderedExecutor;
@@ -250,7 +255,8 @@ public class PulsarMockBookKeeper extends BookKeeper {
return FutureUtils.exception(new
BKException.BKUnauthorizedAccessException());
} else {
return FutureUtils.value(new
PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
-
lh.getLedgerMetadata(), lh.entries));
+ lh.getLedgerMetadata(), lh.entries,
+
PulsarMockBookKeeper.this::getReadHandleInterceptor));
}
});
}
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index aa61e541d0d..d30684e6046 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -73,7 +73,7 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
this.digest = digest;
this.passwd = Arrays.copyOf(passwd, passwd.length);
- readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(),
entries);
+ readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(),
entries, bk::getReadHandleInterceptor);
}
@Override
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
index a4361f62254..9f3f4969199 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle {
private final long ledgerId;
private final LedgerMetadata metadata;
private final List<LedgerEntryImpl> entries;
+ private final Supplier<PulsarMockReadHandleInterceptor>
readHandleInterceptorSupplier;
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId,
LedgerMetadata metadata,
- List<LedgerEntryImpl> entries) {
+ List<LedgerEntryImpl> entries,
+ Supplier<PulsarMockReadHandleInterceptor>
readHandleInterceptorSupplier) {
this.bk = bk;
this.ledgerId = ledgerId;
this.metadata = metadata;
this.entries = entries;
+ this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
}
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
return bk.getProgrammedFailure().thenComposeAsync((res) -> {
- log.debug("readEntries: first={} last={} total={}",
firstEntry, lastEntry, entries.size());
- List<LedgerEntry> seq = new ArrayList<>();
- long entryId = firstEntry;
- while (entryId <= lastEntry && entryId < entries.size()) {
- seq.add(entries.get((int) entryId++).duplicate());
- }
- log.debug("Entries read: {}", seq);
-
- return FutureUtils.value(LedgerEntriesImpl.create(seq));
- });
+ log.debug("readEntries: first={} last={} total={}", firstEntry,
lastEntry, entries.size());
+ List<LedgerEntry> seq = new ArrayList<>();
+ long entryId = firstEntry;
+ while (entryId <= lastEntry && entryId < entries.size()) {
+ seq.add(entries.get((int) entryId++).duplicate());
+ }
+ log.debug("Entries read: {}", seq);
+ LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq);
+ PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor =
readHandleInterceptorSupplier.get();
+ if (pulsarMockReadHandleInterceptor != null) {
+ return
pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry,
lastEntry,
+ ledgerEntries);
+ }
+ return FutureUtils.value(ledgerEntries);
+ });
}
@Override
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
new file mode 100644
index 00000000000..acee87b0f77
--- /dev/null
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+
+/**
+ * Interceptor interface for intercepting read handle readAsync operations.
+ * This is useful for testing purposes, for example for introducing delays.
+ */
+public interface PulsarMockReadHandleInterceptor {
+ /**
+ * Intercepts the readAsync operation on a read handle.
+ *
+ * @param ledgerId ledger id
+ * @param firstEntry first entry to read
+ * @param lastEntry last entry to read
+ * @param entries entries that would be returned by the read operation
+ * @return CompletableFuture that will complete with the entries to return
+ */
+ CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long
firstEntry, long lastEntry,
+ LedgerEntries entries);
+}