dlg99 opened a new issue, #4316: URL: https://github.com/apache/bookkeeper/issues/4316
**BUG REPORT** ***Describe the bug*** Similar to https://github.com/apache/bookkeeper/issues/1606 but only happens if autoShrink is true. This was introduced by https://github.com/apache/bookkeeper/pull/3074 or some subsequent change related to autoShrink. @lordcheng10 fyi Error in prod ``` ERROR org.apache.bookkeeper.proto.WriteEntryProcessor - Unexpected exception while writing 3901@24558 : Index 34 out of bounds for length 32 java.lang.ArrayIndexOutOfBoundsException: Index 34 out of bounds for length 32 at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap$Section.get(ConcurrentLongHashMap.java:357) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap.get(ConcurrentLongHashMap.java:204) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.bookie.BookieImpl.addEntryInternal(BookieImpl.java:937) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.bookie.BookieImpl.addEntry(BookieImpl.java:1074) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.proto.WriteEntryProcessor.processPacket(WriteEntryProcessor.java:79) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.proto.PacketProcessorBase.run(PacketProcessorBase.java:202) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.proto.BookieRequestProcessor.processAddRequest(BookieRequestProcessor.java:655) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.proto.BookieRequestProcessor.processRequest(BookieRequestProcessor.java:377) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at org.apache.bookkeeper.proto.BookieRequestHandler.channelRead(BookieRequestHandler.java:90) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at org.apache.bookkeeper.proto.AuthHandler$ServerSideHandler.channelRead(AuthHandler.java:89) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at org.apache.bookkeeper.proto.BookieProtoEncoding$RequestDecoder.channelRead(BookieProtoEncoding.java:477) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final] at org.apache.bookkeeper.stats.ThreadRegistry$RegisteredRunnable.run(ThreadRegistry.java:146) ~[org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final] at java.lang.Thread.run(Thread.java:842) ~[?:?] ``` A clear and concise description of what the bug is. ***To Reproduce*** ```java diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java index f1372b2894..f27d6f335c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java @@ -348,6 +348,7 @@ public class ConcurrentLongHashMapTest { assertEquals(map.size(), n); } + @Test public void concurrentInsertions() throws Throwable { ConcurrentLongHashMap<String> map = @@ -488,6 +489,171 @@ public class ConcurrentLongHashMapTest { executor.shutdown(); } + @Test + public void stressConcurrentInsertionsAndReads2() throws Throwable { + ConcurrentLongHashMap<String> map = + ConcurrentLongHashMap.<String>newBuilder() + .concurrencyLevel(4) + .expectedItems(4) +// .expandFactor(1.1f) +// .shrinkFactor(1.1f) + .autoShrink(true) + .build(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int writeThreads = 8; + final int readThreads = 8; + final int n = 1_000_000; + String[] values = new String[] { + "v", + "vv", + "vvv", + "vvvv", + "vvvvv", + "vvvvvv", + "vvvvvvv", + "vvvvvvvv", + "vvvvvvvvv", + "vvvvvvvvvv", + }; + final int numValues = values.length; + + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + List<Future<?>> futures = new ArrayList<>(); + + System.out.println("Starting writes"); + for (int i = 0; i < writeThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]); + } + })); + } + + System.out.println("Starting reads"); + for (int i = 0; i < readThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + String value = map.get(key); + if (value != null) { + assertEquals(values[(int) Math.abs(key % numValues)], value); + } + } + })); + } + + System.out.println("Waiting for futures"); + int count = 0; + for (Future<?> future : futures) { + future.get(); + count++; + if (count % 1000 == 0) { + System.out.println("Completed " + count + " futures out of " + futures.size()); + } + } + + assertEquals(map.size(), n * writeThreads); + + futures.clear(); + barrier.reset(); + + System.out.println("Starting removes"); + for (int i = 0; i < writeThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]); + map.remove(key); + String value = map.get(key); + assertNull(value); + + } + })); + } + + System.out.println("Starting reads 2"); + for (int i = 0; i < readThreads; i++) { + final int threadIdx = i; + + //for (int k = 0; k < 4; k++) { + futures.add(executor.submit(() -> { + Random random = new Random(threadIdx); + + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int j = 0; j < n; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + String value = map.get(key); + if (value != null) { + assertEquals(values[(int) Math.abs(key % numValues)], value); + } + } + })); + //} + } + + System.out.println("Waiting for futures 2"); + count = 0; + for (Future<?> future : futures) { + future.get(); + count++; + if (count % 1000 == 0) { + System.out.println("Completed " + count + " futures out of " + futures.size()); + } + } + futures.clear(); + + executor.shutdown(); + } + @Test public void testIteration() { ConcurrentLongHashMap<String> map = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
