sijie closed pull request #2544: Readd MockBookKeeper to Pulsar URL: https://github.com/apache/incubator-pulsar/pull/2544
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java new file mode 100644 index 0000000000..f3689e91e2 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; +import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.OpenBuilderBase; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mocked version of BookKeeper client that keeps all ledgers data in memory. + * + * <p>This mocked client is meant to be used in unit tests for applications using the BookKeeper API. + */ +public class PulsarMockBookKeeper extends BookKeeper { + + final ExecutorService executor; + final ZooKeeper zkc; + + @Override + public ZooKeeper getZkHandle() { + return zkc; + } + + @Override + public ClientConfiguration getConf() { + return super.getConf(); + } + + Map<Long, PulsarMockLedgerHandle> ledgers = new ConcurrentHashMap<>(); + AtomicLong sequence = new AtomicLong(3); + + CompletableFuture<Void> defaultResponse = CompletableFuture.completedFuture(null); + List<CompletableFuture<Void>> failures = new ArrayList<>(); + + public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception { + this.zkc = zkc; + this.executor = executor; + } + + @Override + public LedgerHandle createLedger(DigestType digestType, byte passwd[]) + throws BKException, InterruptedException { + return createLedger(3, 2, digestType, passwd); + } + + @Override + public LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte passwd[]) + throws BKException, InterruptedException { + return createLedger(ensSize, qSize, qSize, digestType, passwd); + } + + @Override + public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, final DigestType digestType, + final byte[] passwd, final CreateCallback cb, final Object ctx, Map<String, byte[]> properties) { + getProgrammedFailure().thenComposeAsync((res) -> { + try { + long id = sequence.getAndIncrement(); + log.info("Creating ledger {}", id); + PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); + ledgers.put(id, lh); + return FutureUtils.value(lh); + } catch (Throwable t) { + return FutureUtils.exception(t); + } + }, executor).whenCompleteAsync((lh, exception) -> { + if (exception != null) { + cb.createComplete(getExceptionCode(exception), null, ctx); + } else { + cb.createComplete(BKException.Code.OK, lh, ctx); + } + }, executor); + } + + @Override + public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, + byte[] passwd) throws BKException, InterruptedException { + checkProgrammedFail(); + + try { + long id = sequence.getAndIncrement(); + log.info("Creating ledger {}", id); + PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd); + ledgers.put(id, lh); + return lh; + } catch (Throwable t) { + log.error("Exception:", t); + return null; + } + } + + @Override + public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb, + Object ctx) { + asyncCreateLedger(ensSize, qSize, qSize, digestType, passwd, cb, ctx, Collections.emptyMap()); + } + + @Override + public void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) { + getProgrammedFailure().thenComposeAsync((res) -> { + PulsarMockLedgerHandle lh = ledgers.get(lId); + if (lh == null) { + return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException()); + } else if (lh.digest != digestType) { + return FutureUtils.exception(new BKException.BKDigestMatchException()); + } else if (!Arrays.equals(lh.passwd, passwd)) { + return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); + } else { + return FutureUtils.value(lh); + } + }, executor).whenCompleteAsync((ledger, exception) -> { + if (exception != null) { + cb.openComplete(getExceptionCode(exception), null, ctx); + } else { + cb.openComplete(BKException.Code.OK, ledger, ctx); + } + }, executor); + } + + @Override + public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) { + asyncOpenLedger(lId, digestType, passwd, cb, ctx); + } + + @Override + public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) { + getProgrammedFailure().thenComposeAsync((res) -> { + if (ledgers.containsKey(lId)) { + ledgers.remove(lId); + return FutureUtils.value(null); + } else { + return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException()); + } + }, executor).whenCompleteAsync((res, exception) -> { + if (exception != null) { + cb.deleteComplete(getExceptionCode(exception), ctx); + } else { + cb.deleteComplete(BKException.Code.OK, ctx); + } + }, executor); + } + + @Override + public void deleteLedger(long lId) throws InterruptedException, BKException { + checkProgrammedFail(); + + if (!ledgers.containsKey(lId)) { + throw BKException.create(BKException.Code.NoSuchLedgerExistsException); + } + + ledgers.remove(lId); + } + + @Override + public void close() throws InterruptedException, BKException { + shutdown(); + } + + @Override + public OpenBuilder newOpenLedgerOp() { + return new OpenBuilderBase() { + @Override + public CompletableFuture<ReadHandle> execute() { + return getProgrammedFailure().thenCompose( + (res) -> { + if (!validate()) { + return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException()); + } + + PulsarMockLedgerHandle lh = ledgers.get(ledgerId); + if (lh == null) { + return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException()); + } else if (lh.digest != DigestType.fromApiDigestType(digestType)) { + return FutureUtils.exception(new BKException.BKDigestMatchException()); + } else if (!Arrays.equals(lh.passwd, password)) { + return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); + } else { + return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, + lh.getLedgerMetadata(), lh.entries)); + } + }); + } + }; + } + + public void shutdown() { + try { + super.close(); + } catch (Exception e) { + } + synchronized (this) { + defaultResponse = FutureUtils.exception(new BKException.BKClientClosedException()); + } + for (PulsarMockLedgerHandle ledger : ledgers.values()) { + ledger.entries.clear(); + } + + ledgers.clear(); + } + + public Set<Long> getLedgers() { + return ledgers.keySet(); + } + + void checkProgrammedFail() throws BKException, InterruptedException { + try { + getProgrammedFailure().get(); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof BKException) { + throw (BKException)ee.getCause(); + } else { + throw new BKException.BKUnexpectedConditionException(); + } + } + } + + synchronized CompletableFuture<Void> getProgrammedFailure() { + return failures.isEmpty() ? defaultResponse : failures.remove(0); + } + + public void failNow(int rc) { + failAfter(0, rc); + } + + public void failAfter(int steps, int rc) { + promiseAfter(steps).completeExceptionally(BKException.create(rc)); + } + + public synchronized CompletableFuture<Void> promiseAfter(int steps) { + while (failures.size() <= steps) { + failures.add(defaultResponse); + } + CompletableFuture<Void> promise = new CompletableFuture<>(); + failures.set(steps, promise); + return promise; + } + + static int getExceptionCode(Throwable t) { + if (t instanceof BKException) { + return ((BKException) t).getCode(); + } else if (t.getCause() != null) { + return getExceptionCode(t.getCause()); + } else { + return BKException.Code.UnexpectedConditionException; + } + } + + private static final Logger log = LoggerFactory.getLogger(PulsarMockBookKeeper.class); +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java new file mode 100644 index 0000000000..3037a7899e --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.security.GeneralSecurityException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Enumeration; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.api.WriteFlag; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock BK {@link LedgerHandle}. Used by {@link PulsarMockBookKeeper}. + */ +public class PulsarMockLedgerHandle extends LedgerHandle { + + final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList(); + final PulsarMockBookKeeper bk; + final long id; + final DigestType digest; + final byte[] passwd; + final ReadHandle readHandle; + long lastEntry = -1; + boolean fenced = false; + + PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, + DigestType digest, byte[] passwd) throws GeneralSecurityException { + super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(), + EnumSet.noneOf(WriteFlag.class)); + this.bk = bk; + this.id = id; + this.digest = digest; + this.passwd = Arrays.copyOf(passwd, passwd.length); + + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries); + } + + @Override + public void asyncClose(CloseCallback cb, Object ctx) { + bk.getProgrammedFailure().thenComposeAsync((res) -> { + fenced = true; + return FutureUtils.value(null); + }, bk.executor).whenCompleteAsync((res, exception) -> { + if (exception != null) { + cb.closeComplete(PulsarMockBookKeeper.getExceptionCode(exception), null, ctx); + } else { + cb.closeComplete(BKException.Code.OK, this, ctx); + } + }, bk.executor); + } + + @Override + public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { + bk.getProgrammedFailure().thenComposeAsync((res) -> { + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); + } + + log.debug("Entries read: {}", seq); + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + + Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() { + @Override + public boolean hasMoreElements() { + return !seq.isEmpty(); + } + + @Override + public LedgerEntry nextElement() { + return seq.remove(); + } + }; + return FutureUtils.value(entries); + }).whenCompleteAsync((res, exception) -> { + if (exception != null) { + cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), PulsarMockLedgerHandle.this, null, ctx); + } else { + cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx); + } + }, bk.executor); + } + + @Override + public long addEntry(byte[] data) throws InterruptedException, BKException { + try { + bk.checkProgrammedFail(); + } catch (BKException e) { + fenced = true; + throw e; + } + + if (fenced) { + throw BKException.create(BKException.Code.LedgerFencedException); + } + + lastEntry = entries.size(); + entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data))); + return lastEntry; + } + + @Override + public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) { + asyncAddEntry(data, 0, data.length, cb, ctx); + } + + @Override + public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, + final Object ctx) { + asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx); + } + + @Override + public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) { + data.retain(); + bk.getProgrammedFailure().thenComposeAsync((res) -> { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + + if (fenced) { + return FutureUtils.exception(new BKException.BKLedgerFencedException()); + } else { + lastEntry = entries.size(); + byte[] storedData = new byte[data.readableBytes()]; + data.readBytes(storedData); + entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, + storedData.length, Unpooled.wrappedBuffer(storedData))); + return FutureUtils.value(lastEntry); + } + + }, bk.executor).whenCompleteAsync((entryId, exception) -> { + data.release(); + if (exception != null) { + fenced = true; + cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception), + PulsarMockLedgerHandle.this, INVALID_ENTRY_ID, ctx); + } else { + cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx); + } + }); + } + + @Override + public long getId() { + return ledgerId; + } + + @Override + public long getLastAddConfirmed() { + return lastEntry; + } + + @Override + public long getLength() { + long length = 0; + for (LedgerEntryImpl entry : entries) { + length += entry.getLength(); + } + + return length; + } + + + // ReadHandle interface + @Override + public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { + return readHandle.readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { + return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture<Long> readLastAddConfirmedAsync() { + return readHandle.readLastAddConfirmedAsync(); + } + + @Override + public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { + return readHandle.tryReadLastAddConfirmedAsync(); + } + + @Override + public boolean isClosed() { + return readHandle.isClosed(); + } + + @Override + public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, + long timeOutInMillis, + boolean parallel) { + return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); + } + + private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class); + +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java new file mode 100644 index 0000000000..30bcf46210 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.concurrent.FutureUtils; + +/** + * Mock implementation of ReadHandle. + */ +@Slf4j +class PulsarMockReadHandle implements ReadHandle { + private final PulsarMockBookKeeper bk; + private final long ledgerId; + private final LedgerMetadata metadata; + private final List<LedgerEntryImpl> entries; + + PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List<LedgerEntryImpl> entries) { + this.bk = bk; + this.ledgerId = ledgerId; + this.metadata = metadata; + this.entries = entries; + } + + @Override + public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { + return bk.getProgrammedFailure().thenComposeAsync((res) -> { + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + List<LedgerEntry> seq = new ArrayList<>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(entries.get((int) entryId++).duplicate()); + } + log.debug("Entries read: {}", seq); + + return FutureUtils.value(LedgerEntriesImpl.create(seq)); + }); + } + + @Override + public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { + return readAsync(firstEntry, lastEntry); + } + + @Override + public CompletableFuture<Long> readLastAddConfirmedAsync() { + return CompletableFuture.completedFuture(getLastAddConfirmed()); + } + + @Override + public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { + return readLastAddConfirmedAsync(); + } + + @Override + public long getLastAddConfirmed() { + return entries.get(entries.size() - 1).getEntryId(); + } + + @Override + public long getLength() { + long length = 0; + for (LedgerEntryImpl entry : entries) { + length += entry.getLength(); + } + + return length; + } + + @Override + public boolean isClosed() { + return metadata.isClosed(); + } + + @Override + public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, + long timeOutInMillis, + boolean parallel) { + CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>(); + promise.completeExceptionally(new UnsupportedOperationException("Long poll not implemented")); + return promise; + } + + // Handle interface + @Override + public long getId() { + return ledgerId; + } + + @Override + public CompletableFuture<Void> closeAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public LedgerMetadata getLedgerMetadata() { + return metadata; + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 35eb986df5..0f0d78917a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -435,8 +435,8 @@ public void recoverLongTimeAfterMultipleWriteErrors() throws Exception { ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors"); ManagedCursor cursor = ledger.openCursor("c1"); - bkc.failNow(BKException.Code.BookieHandleNotAvailableException, - BKException.Code.BookieHandleNotAvailableException); + bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException); + bkc.failAfter(1, BKException.Code.BookieHandleNotAvailableException); CountDownLatch counter = new CountDownLatch(2); AtomicReference<ManagedLedgerException> ex = new AtomicReference<>(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index e3c18a07d8..a62a1627e9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.bookkeeper.client.MockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; @@ -47,7 +47,7 @@ protected MockZooKeeper zkc; // BookKeeper related variables - protected MockBookKeeper bkc; + protected PulsarMockBookKeeper bkc; protected int numBookies; protected ManagedLedgerFactoryImpl factory; @@ -117,7 +117,7 @@ protected void startBookKeeper() throws Exception { zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null); - bkc = new MockBookKeeper(zkc); + bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this)); } protected void stopBookKeeper() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java index 1922da75c0..728bc67613 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java @@ -18,20 +18,33 @@ */ package org.apache.pulsar.broker; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.MockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; + import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MockedBookKeeperClientFactory implements BookKeeperClientFactory { + private static final Logger log = LoggerFactory.getLogger(MockedBookKeeperClientFactory.class); private final BookKeeper mockedBk; + private final ExecutorService executor; public MockedBookKeeperClientFactory() { try { - mockedBk = new MockBookKeeper(null); + executor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("mock-bk-client-factory") + .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex)) + .build()); + mockedBk = new PulsarMockBookKeeper(null, executor); } catch (Exception e) { throw new RuntimeException(e); } @@ -48,5 +61,6 @@ public void close() { mockedBk.close(); } catch (BKException | InterruptedException e) { } + executor.shutdown(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index bc98efa557..5c64ac8038 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -21,19 +21,23 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.IOException; import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.MockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.ZkUtils; @@ -82,6 +86,7 @@ protected final String configClusterName = "test"; private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; + private ExecutorService bkExecutor; public MockedPulsarServiceBaseTest() { resetConfig(); @@ -122,10 +127,14 @@ protected final void internalSetupForStatsTest() throws Exception { } protected final void init() throws Exception { - mockZookKeeper = createMockZooKeeper(); - mockBookKeeper = createMockBookKeeper(mockZookKeeper); - sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); + bkExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk") + .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex)) + .build()); + + mockZookKeeper = createMockZooKeeper(); + mockBookKeeper = createMockBookKeeper(mockZookKeeper, bkExecutor); startBroker(); @@ -157,6 +166,9 @@ protected final void internalCleanup() throws Exception { if (sameThreadOrderedSafeExecutor != null) { sameThreadOrderedSafeExecutor.shutdown(); } + if (bkExecutor != null) { + bkExecutor.shutdown(); + } } catch (Exception e) { log.warn("Failed to clean up mocked pulsar service:", e); throw e; @@ -221,15 +233,16 @@ public static MockZooKeeper createMockZooKeeper() throws Exception { return zk; } - public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception { - return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper)); + public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper, + ExecutorService executor) throws Exception { + return spy(new NonClosableMockBookKeeper(zookeeper, executor)); } // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test - public static class NonClosableMockBookKeeper extends MockBookKeeper { + public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper { - public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper zk) throws Exception { - super(zk); + public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception { + super(zk, executor); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index c1be8d1b2f..9db657a059 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -113,7 +113,8 @@ public void setup() throws Exception { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); - doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); + doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0))) + .when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index f8ac40a945..f93aa944ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -153,7 +153,8 @@ public void setup() throws Exception { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); - doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); + doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0))) + .when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index f746571f8c..1145321943 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -157,7 +157,8 @@ public void setup() throws Exception { ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); - doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient(); + doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0))) + .when(pulsar).getBookKeeperClient(); configCacheService = mock(ConfigurationCacheService.class); ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index eb88d37411..8fe170dbe9 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -43,7 +43,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; -import org.apache.bookkeeper.client.MockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -83,11 +83,11 @@ private static MockZooKeeper createMockZooKeeper() throws Exception { private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024; private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024; final OrderedScheduler scheduler; - final MockBookKeeper bk; + final PulsarMockBookKeeper bk; BlobStoreManagedLedgerOffloaderTest() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); - bk = new MockBookKeeper(createMockZooKeeper()); + bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this)); } private ReadHandle buildReadHandle() throws Exception { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services