sijie closed pull request #2544: Readd MockBookKeeper to Pulsar
URL: https://github.com/apache/incubator-pulsar/pull/2544
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
new file mode 100644
index 0000000000..f3689e91e2
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocked version of BookKeeper client that keeps all ledgers data in memory.
+ *
+ * <p>This mocked client is meant to be used in unit tests for applications 
using the BookKeeper API.
+ */
+public class PulsarMockBookKeeper extends BookKeeper {
+
+    final ExecutorService executor;
+    final ZooKeeper zkc;
+
+    @Override
+    public ZooKeeper getZkHandle() {
+        return zkc;
+    }
+
+    @Override
+    public ClientConfiguration getConf() {
+        return super.getConf();
+    }
+
+    Map<Long, PulsarMockLedgerHandle> ledgers = new ConcurrentHashMap<>();
+    AtomicLong sequence = new AtomicLong(3);
+
+    CompletableFuture<Void> defaultResponse = 
CompletableFuture.completedFuture(null);
+    List<CompletableFuture<Void>> failures = new ArrayList<>();
+
+    public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) 
throws Exception {
+        this.zkc = zkc;
+        this.executor = executor;
+    }
+
+    @Override
+    public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        return createLedger(3, 2, digestType, passwd);
+    }
+
+    @Override
+    public LedgerHandle createLedger(int ensSize, int qSize, DigestType 
digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        return createLedger(ensSize, qSize, qSize, digestType, passwd);
+    }
+
+    @Override
+    public void asyncCreateLedger(int ensSize, int writeQuorumSize, int 
ackQuorumSize, final DigestType digestType,
+            final byte[] passwd, final CreateCallback cb, final Object ctx, 
Map<String, byte[]> properties) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                try {
+                    long id = sequence.getAndIncrement();
+                    log.info("Creating ledger {}", id);
+                    PulsarMockLedgerHandle lh = new 
PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+                    ledgers.put(id, lh);
+                    return FutureUtils.value(lh);
+                } catch (Throwable t) {
+                    return FutureUtils.exception(t);
+                }
+            }, executor).whenCompleteAsync((lh, exception) -> {
+                    if (exception != null) {
+                        cb.createComplete(getExceptionCode(exception), null, 
ctx);
+                    } else {
+                        cb.createComplete(BKException.Code.OK, lh, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int 
ackQuorumSize, DigestType digestType,
+            byte[] passwd) throws BKException, InterruptedException {
+        checkProgrammedFail();
+
+        try {
+            long id = sequence.getAndIncrement();
+            log.info("Creating ledger {}", id);
+            PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, 
digestType, passwd);
+            ledgers.put(id, lh);
+            return lh;
+        } catch (Throwable t) {
+            log.error("Exception:", t);
+            return null;
+        }
+    }
+
+    @Override
+    public void asyncCreateLedger(int ensSize, int qSize, DigestType 
digestType, byte[] passwd, CreateCallback cb,
+            Object ctx) {
+        asyncCreateLedger(ensSize, qSize, qSize, digestType, passwd, cb, ctx, 
Collections.emptyMap());
+    }
+
+    @Override
+    public void asyncOpenLedger(long lId, DigestType digestType, byte[] 
passwd, OpenCallback cb, Object ctx) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                PulsarMockLedgerHandle lh = ledgers.get(lId);
+                if (lh == null) {
+                    return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
+                } else if (lh.digest != digestType) {
+                    return FutureUtils.exception(new 
BKException.BKDigestMatchException());
+                } else if (!Arrays.equals(lh.passwd, passwd)) {
+                    return FutureUtils.exception(new 
BKException.BKUnauthorizedAccessException());
+                } else {
+                    return FutureUtils.value(lh);
+                }
+            }, executor).whenCompleteAsync((ledger, exception) -> {
+                    if (exception != null) {
+                        cb.openComplete(getExceptionCode(exception), null, 
ctx);
+                    } else {
+                        cb.openComplete(BKException.Code.OK, ledger, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, 
byte[] passwd, OpenCallback cb, Object ctx) {
+        asyncOpenLedger(lId, digestType, passwd, cb, ctx);
+    }
+
+    @Override
+    public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                if (ledgers.containsKey(lId)) {
+                    ledgers.remove(lId);
+                    return FutureUtils.value(null);
+                } else {
+                    return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
+                }
+            }, executor).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        cb.deleteComplete(getExceptionCode(exception), ctx);
+                    } else {
+                        cb.deleteComplete(BKException.Code.OK, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public void deleteLedger(long lId) throws InterruptedException, 
BKException {
+        checkProgrammedFail();
+
+        if (!ledgers.containsKey(lId)) {
+            throw 
BKException.create(BKException.Code.NoSuchLedgerExistsException);
+        }
+
+        ledgers.remove(lId);
+    }
+
+    @Override
+    public void close() throws InterruptedException, BKException {
+        shutdown();
+    }
+
+    @Override
+    public OpenBuilder newOpenLedgerOp() {
+        return new OpenBuilderBase() {
+            @Override
+            public CompletableFuture<ReadHandle> execute() {
+                return getProgrammedFailure().thenCompose(
+                        (res) -> {
+                            if (!validate()) {
+                                return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
+                            }
+
+                            PulsarMockLedgerHandle lh = ledgers.get(ledgerId);
+                            if (lh == null) {
+                                return FutureUtils.exception(new 
BKException.BKNoSuchLedgerExistsException());
+                            } else if (lh.digest != 
DigestType.fromApiDigestType(digestType)) {
+                                return FutureUtils.exception(new 
BKException.BKDigestMatchException());
+                            } else if (!Arrays.equals(lh.passwd, password)) {
+                                return FutureUtils.exception(new 
BKException.BKUnauthorizedAccessException());
+                            } else {
+                                return FutureUtils.value(new 
PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
+                                                                               
   lh.getLedgerMetadata(), lh.entries));
+                            }
+                        });
+            }
+        };
+    }
+
+    public void shutdown() {
+        try {
+            super.close();
+        } catch (Exception e) {
+        }
+        synchronized (this) {
+            defaultResponse = FutureUtils.exception(new 
BKException.BKClientClosedException());
+        }
+        for (PulsarMockLedgerHandle ledger : ledgers.values()) {
+            ledger.entries.clear();
+        }
+
+        ledgers.clear();
+    }
+
+    public Set<Long> getLedgers() {
+        return ledgers.keySet();
+    }
+
+    void checkProgrammedFail() throws BKException, InterruptedException {
+        try {
+            getProgrammedFailure().get();
+        } catch (ExecutionException ee) {
+            if (ee.getCause() instanceof BKException) {
+                throw (BKException)ee.getCause();
+            } else {
+                throw new BKException.BKUnexpectedConditionException();
+            }
+        }
+    }
+
+    synchronized CompletableFuture<Void> getProgrammedFailure() {
+        return failures.isEmpty() ? defaultResponse : failures.remove(0);
+    }
+
+    public void failNow(int rc) {
+        failAfter(0, rc);
+    }
+
+    public void failAfter(int steps, int rc) {
+        promiseAfter(steps).completeExceptionally(BKException.create(rc));
+    }
+
+    public synchronized CompletableFuture<Void> promiseAfter(int steps) {
+        while (failures.size() <= steps) {
+            failures.add(defaultResponse);
+        }
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        failures.set(steps, promise);
+        return promise;
+    }
+
+    static int getExceptionCode(Throwable t) {
+        if (t instanceof BKException) {
+            return ((BKException) t).getCode();
+        } else if (t.getCause() != null) {
+            return getExceptionCode(t.getCause());
+        } else {
+            return BKException.Code.UnexpectedConditionException;
+        }
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarMockBookKeeper.class);
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
new file mode 100644
index 0000000000..3037a7899e
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock BK {@link LedgerHandle}. Used by {@link PulsarMockBookKeeper}.
+ */
+public class PulsarMockLedgerHandle extends LedgerHandle {
+
+    final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
+    final PulsarMockBookKeeper bk;
+    final long id;
+    final DigestType digest;
+    final byte[] passwd;
+    final ReadHandle readHandle;
+    long lastEntry = -1;
+    boolean fenced = false;
+
+    PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+                           DigestType digest, byte[] passwd) throws 
GeneralSecurityException {
+        super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, 
"".getBytes()), DigestType.MAC, "".getBytes(),
+                EnumSet.noneOf(WriteFlag.class));
+        this.bk = bk;
+        this.id = id;
+        this.digest = digest;
+        this.passwd = Arrays.copyOf(passwd, passwd.length);
+
+        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), 
entries);
+    }
+
+    @Override
+    public void asyncClose(CloseCallback cb, Object ctx) {
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                fenced = true;
+                return FutureUtils.value(null);
+            }, bk.executor).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        
cb.closeComplete(PulsarMockBookKeeper.getExceptionCode(exception), null, ctx);
+                    } else {
+                        cb.closeComplete(BKException.Code.OK, this, ctx);
+                    }
+                }, bk.executor);
+    }
+
+    @Override
+    public void asyncReadEntries(final long firstEntry, final long lastEntry, 
final ReadCallback cb, final Object ctx) {
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
+                final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(new LedgerEntry(entries.get((int) 
entryId++).duplicate()));
+                }
+
+                log.debug("Entries read: {}", seq);
+
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                }
+
+                Enumeration<LedgerEntry> entries = new 
Enumeration<LedgerEntry>() {
+                        @Override
+                        public boolean hasMoreElements() {
+                            return !seq.isEmpty();
+                        }
+
+                        @Override
+                        public LedgerEntry nextElement() {
+                            return seq.remove();
+                        }
+                    };
+                return FutureUtils.value(entries);
+            }).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        
cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), 
PulsarMockLedgerHandle.this, null, ctx);
+                    } else {
+                        cb.readComplete(BKException.Code.OK, 
PulsarMockLedgerHandle.this, res, ctx);
+                    }
+                }, bk.executor);
+    }
+
+    @Override
+    public long addEntry(byte[] data) throws InterruptedException, BKException 
{
+        try {
+            bk.checkProgrammedFail();
+        } catch (BKException e) {
+            fenced = true;
+            throw e;
+        }
+
+        if (fenced) {
+            throw BKException.create(BKException.Code.LedgerFencedException);
+        }
+
+        lastEntry = entries.size();
+        entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, 
Unpooled.wrappedBuffer(data)));
+        return lastEntry;
+    }
+
+    @Override
+    public void asyncAddEntry(final byte[] data, final AddCallback cb, final 
Object ctx) {
+        asyncAddEntry(data, 0, data.length, cb, ctx);
+    }
+
+    @Override
+    public void asyncAddEntry(final byte[] data, final int offset, final int 
length, final AddCallback cb,
+            final Object ctx) {
+        asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
+    @Override
+    public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final 
Object ctx) {
+        data.retain();
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                }
+
+                if (fenced) {
+                    return FutureUtils.exception(new 
BKException.BKLedgerFencedException());
+                } else {
+                    lastEntry = entries.size();
+                    byte[] storedData = new byte[data.readableBytes()];
+                    data.readBytes(storedData);
+                    entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
+                                                       storedData.length, 
Unpooled.wrappedBuffer(storedData)));
+                    return FutureUtils.value(lastEntry);
+                }
+
+            }, bk.executor).whenCompleteAsync((entryId, exception) -> {
+                    data.release();
+                    if (exception != null) {
+                        fenced = true;
+                        
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
+                                       PulsarMockLedgerHandle.this, 
INVALID_ENTRY_ID, ctx);
+                    } else {
+                        cb.addComplete(BKException.Code.OK, 
PulsarMockLedgerHandle.this, entryId, ctx);
+                    }
+                });
+    }
+
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return lastEntry;
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+
+    // ReadHandle interface
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
+        return readHandle.readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+        return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return readHandle.readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readHandle.tryReadLastAddConfirmedAsync();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return readHandle.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               
       long timeOutInMillis,
+                                                                               
       boolean parallel) {
+        return readHandle.readLastAddConfirmedAndEntryAsync(entryId, 
timeOutInMillis, parallel);
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
+
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
new file mode 100644
index 0000000000..30bcf46210
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * Mock implementation of ReadHandle.
+ */
+@Slf4j
+class PulsarMockReadHandle implements ReadHandle {
+    private final PulsarMockBookKeeper bk;
+    private final long ledgerId;
+    private final LedgerMetadata metadata;
+    private final List<LedgerEntryImpl> entries;
+
+    PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, 
LedgerMetadata metadata, List<LedgerEntryImpl> entries) {
+        this.bk = bk;
+        this.ledgerId = ledgerId;
+        this.metadata = metadata;
+        this.entries = entries;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
+        return bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                log.debug("readEntries: first={} last={} total={}", 
firstEntry, lastEntry, entries.size());
+                List<LedgerEntry> seq = new ArrayList<>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(entries.get((int) entryId++).duplicate());
+                }
+                log.debug("Entries read: {}", seq);
+
+                return FutureUtils.value(LedgerEntriesImpl.create(seq));
+            });
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return entries.get(entries.size() - 1).getEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return metadata.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               
       long timeOutInMillis,
+                                                                               
       boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new 
CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException("Long 
poll not implemented"));
+        return promise;
+    }
+
+    // Handle interface
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return metadata;
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 35eb986df5..0f0d78917a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -435,8 +435,8 @@ public void recoverLongTimeAfterMultipleWriteErrors() 
throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("recoverLongTimeAfterMultipleWriteErrors");
         ManagedCursor cursor = ledger.openCursor("c1");
 
-        bkc.failNow(BKException.Code.BookieHandleNotAvailableException,
-                BKException.Code.BookieHandleNotAvailableException);
+        bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException);
+        bkc.failAfter(1, BKException.Code.BookieHandleNotAvailableException);
 
         CountDownLatch counter = new CountDownLatch(2);
         AtomicReference<ManagedLedgerException> ex = new AtomicReference<>();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index e3c18a07d8..a62a1627e9 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -22,7 +22,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
@@ -47,7 +47,7 @@
     protected MockZooKeeper zkc;
 
     // BookKeeper related variables
-    protected MockBookKeeper bkc;
+    protected PulsarMockBookKeeper bkc;
     protected int numBookies;
 
     protected ManagedLedgerFactoryImpl factory;
@@ -117,7 +117,7 @@ protected void startBookKeeper() throws Exception {
 
         zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
 
-        bkc = new MockBookKeeper(zkc);
+        bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
     }
 
     protected void stopBookKeeper() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 1922da75c0..728bc67613 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -18,20 +18,33 @@
  */
 package org.apache.pulsar.broker;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+
 import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
+    private static final Logger log = 
LoggerFactory.getLogger(MockedBookKeeperClientFactory.class);
 
     private final BookKeeper mockedBk;
+    private final ExecutorService executor;
 
     public MockedBookKeeperClientFactory() {
         try {
-            mockedBk = new MockBookKeeper(null);
+            executor = Executors.newSingleThreadExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat("mock-bk-client-factory")
+                    .setUncaughtExceptionHandler((thread, ex) -> 
log.info("Uncaught exception", ex))
+                    .build());
+            mockedBk = new PulsarMockBookKeeper(null, executor);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -48,5 +61,6 @@ public void close() {
             mockedBk.close();
         } catch (BKException | InterruptedException e) {
         }
+        executor.shutdown();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index bc98efa557..5c64ac8038 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,19 +21,23 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -82,6 +86,7 @@
     protected final String configClusterName = "test";
 
     private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
+    private ExecutorService bkExecutor;
 
     public MockedPulsarServiceBaseTest() {
         resetConfig();
@@ -122,10 +127,14 @@ protected final void internalSetupForStatsTest() throws 
Exception {
     }
 
     protected final void init() throws Exception {
-        mockZookKeeper = createMockZooKeeper();
-        mockBookKeeper = createMockBookKeeper(mockZookKeeper);
-
         sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
+        bkExecutor = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
+                .setUncaughtExceptionHandler((thread, ex) -> 
log.info("Uncaught exception", ex))
+                .build());
+
+        mockZookKeeper = createMockZooKeeper();
+        mockBookKeeper = createMockBookKeeper(mockZookKeeper, bkExecutor);
 
         startBroker();
 
@@ -157,6 +166,9 @@ protected final void internalCleanup() throws Exception {
             if (sameThreadOrderedSafeExecutor != null) {
                 sameThreadOrderedSafeExecutor.shutdown();
             }
+            if (bkExecutor != null) {
+                bkExecutor.shutdown();
+            }
         } catch (Exception e) {
             log.warn("Failed to clean up mocked pulsar service:", e);
             throw e;
@@ -221,15 +233,16 @@ public static MockZooKeeper createMockZooKeeper() throws 
Exception {
         return zk;
     }
 
-    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper 
zookeeper) throws Exception {
-        return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), 
zookeeper));
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper 
zookeeper,
+                                                                 
ExecutorService executor) throws Exception {
+        return spy(new NonClosableMockBookKeeper(zookeeper, executor));
     }
 
     // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
-    public static class NonClosableMockBookKeeper extends MockBookKeeper {
+    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper 
{
 
-        public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper 
zk) throws Exception {
-            super(zk);
+        public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService 
executor) throws Exception {
+            super(zk, executor);
         }
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index c1be8d1b2f..9db657a059 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -113,7 +113,8 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, 
pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f8ac40a945..f93aa944ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -153,7 +153,8 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, 
pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index f746571f8c..1145321943 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -157,7 +157,8 @@ public void setup() throws Exception {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        
doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, 
pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         ZooKeeperDataCache<Policies> zkDataCache = 
mock(ZooKeeperDataCache.class);
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index eb88d37411..8fe170dbe9 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -43,7 +43,7 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -83,11 +83,11 @@ private static MockZooKeeper createMockZooKeeper() throws 
Exception {
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
     final OrderedScheduler scheduler;
-    final MockBookKeeper bk;
+    final PulsarMockBookKeeper bk;
 
     BlobStoreManagedLedgerOffloaderTest() throws Exception {
         scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
-        bk = new MockBookKeeper(createMockZooKeeper());
+        bk = new PulsarMockBookKeeper(createMockZooKeeper(), 
scheduler.chooseThread(this));
     }
 
     private ReadHandle buildReadHandle() throws Exception {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to