dlg99 opened a new issue, #4316:
URL: https://github.com/apache/bookkeeper/issues/4316

   **BUG REPORT**
   
   ***Describe the bug***
   
   Similar to https://github.com/apache/bookkeeper/issues/1606 but only happens 
if autoShrink is true.
   This was introduced by https://github.com/apache/bookkeeper/pull/3074 or 
some subsequent change related to autoShrink.
   @lordcheng10 fyi
   
   Error in prod
   ```
   ERROR org.apache.bookkeeper.proto.WriteEntryProcessor - Unexpected exception 
while writing 3901@24558 : Index 34 out of bounds for length 32
   java.lang.ArrayIndexOutOfBoundsException: Index 34 out of bounds for length 
32
           at 
org.apache.bookkeeper.util.collections.ConcurrentLongHashMap$Section.get(ConcurrentLongHashMap.java:357)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.util.collections.ConcurrentLongHashMap.get(ConcurrentLongHashMap.java:204)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.bookie.BookieImpl.addEntryInternal(BookieImpl.java:937) 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.bookie.BookieImpl.addEntry(BookieImpl.java:1074) 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.proto.WriteEntryProcessor.processPacket(WriteEntryProcessor.java:79)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.proto.PacketProcessorBase.run(PacketProcessorBase.java:202)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.proto.BookieRequestProcessor.processAddRequest(BookieRequestProcessor.java:655)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.proto.BookieRequestProcessor.processRequest(BookieRequestProcessor.java:377)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
org.apache.bookkeeper.proto.BookieRequestHandler.channelRead(BookieRequestHandler.java:90)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
org.apache.bookkeeper.proto.AuthHandler$ServerSideHandler.channelRead(AuthHandler.java:89)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
org.apache.bookkeeper.proto.BookieProtoEncoding$RequestDecoder.channelRead(BookieProtoEncoding.java:477)
 
~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
 ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
 ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
 ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801)
 ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) 
~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) 
~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
           at 
org.apache.bookkeeper.stats.ThreadRegistry$RegisteredRunnable.run(ThreadRegistry.java:146)
 
~[org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
           at java.lang.Thread.run(Thread.java:842) ~[?:?]
   ```
   
   
   A clear and concise description of what the bug is.
   
   ***To Reproduce***
   
   ```java
   diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
   index f1372b2894..f27d6f335c 100644
   --- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
   +++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
   @@ -348,6 +348,7 @@ public class ConcurrentLongHashMapTest {
            assertEquals(map.size(), n);
        }
    
   +
        @Test
        public void concurrentInsertions() throws Throwable {
            ConcurrentLongHashMap<String> map =
   @@ -488,6 +489,171 @@ public class ConcurrentLongHashMapTest {
            executor.shutdown();
        }
    
   +    @Test
   +    public void stressConcurrentInsertionsAndReads2() throws Throwable {
   +        ConcurrentLongHashMap<String> map =
   +                ConcurrentLongHashMap.<String>newBuilder()
   +                        .concurrencyLevel(4)
   +                        .expectedItems(4)
   +//                        .expandFactor(1.1f)
   +//                        .shrinkFactor(1.1f)
   +                        .autoShrink(true)
   +                        .build();
   +        ExecutorService executor = Executors.newCachedThreadPool();
   +
   +        final int writeThreads = 8;
   +        final int readThreads = 8;
   +        final int n = 1_000_000;
   +        String[] values = new String[] {
   +                "v",
   +                "vv",
   +                "vvv",
   +                "vvvv",
   +                "vvvvv",
   +                "vvvvvv",
   +                "vvvvvvv",
   +                "vvvvvvvv",
   +                "vvvvvvvvv",
   +                "vvvvvvvvvv",
   +        };
   +        final int numValues = values.length;
   +
   +        CyclicBarrier barrier = new CyclicBarrier(writeThreads + 
readThreads);
   +        List<Future<?>> futures = new ArrayList<>();
   +
   +        System.out.println("Starting writes");
   +        for (int i = 0; i < writeThreads; i++) {
   +            final int threadIdx = i;
   +
   +            futures.add(executor.submit(() -> {
   +                Random random = new Random(threadIdx);
   +
   +                try {
   +                    barrier.await();
   +                } catch (Exception e) {
   +                    throw new RuntimeException(e);
   +                }
   +
   +                for (int j = 0; j < n; j++) {
   +                    long key = random.nextLong();
   +                    // Ensure keys are uniques
   +                    key -= key % (threadIdx + 1);
   +
   +                    map.putIfAbsent(key, values[(int)Math.abs(key % 
numValues)]);
   +                }
   +            }));
   +        }
   +
   +        System.out.println("Starting reads");
   +        for (int i = 0; i < readThreads; i++) {
   +            final int threadIdx = i;
   +
   +            futures.add(executor.submit(() -> {
   +                Random random = new Random(threadIdx);
   +
   +                try {
   +                    barrier.await();
   +                } catch (Exception e) {
   +                    throw new RuntimeException(e);
   +                }
   +
   +                for (int j = 0; j < n; j++) {
   +                    long key = random.nextLong();
   +                    // Ensure keys are uniques
   +                    key -= key % (threadIdx + 1);
   +
   +                    String value = map.get(key);
   +                    if (value != null) {
   +                        assertEquals(values[(int) Math.abs(key % 
numValues)], value);
   +                    }
   +                }
   +            }));
   +        }
   +
   +        System.out.println("Waiting for futures");
   +        int count = 0;
   +        for (Future<?> future : futures) {
   +            future.get();
   +            count++;
   +            if (count % 1000 == 0) {
   +                System.out.println("Completed " + count + " futures out of 
" + futures.size());
   +            }
   +        }
   +
   +        assertEquals(map.size(), n * writeThreads);
   +
   +        futures.clear();
   +        barrier.reset();
   +
   +        System.out.println("Starting removes");
   +        for (int i = 0; i < writeThreads; i++) {
   +            final int threadIdx = i;
   +
   +            futures.add(executor.submit(() -> {
   +                Random random = new Random(threadIdx);
   +
   +                try {
   +                    barrier.await();
   +                } catch (Exception e) {
   +                    throw new RuntimeException(e);
   +                }
   +
   +                for (int j = 0; j < n; j++) {
   +                    long key = random.nextLong();
   +                    // Ensure keys are uniques
   +                    key -= key % (threadIdx + 1);
   +
   +                    map.putIfAbsent(key, values[(int)Math.abs(key % 
numValues)]);
   +                    map.remove(key);
   +                    String value = map.get(key);
   +                    assertNull(value);
   +
   +                }
   +            }));
   +        }
   +
   +        System.out.println("Starting reads 2");
   +        for (int i = 0; i < readThreads; i++) {
   +            final int threadIdx = i;
   +
   +            //for (int k = 0; k < 4; k++) {
   +                futures.add(executor.submit(() -> {
   +                    Random random = new Random(threadIdx);
   +
   +                    try {
   +                        barrier.await();
   +                    } catch (Exception e) {
   +                        throw new RuntimeException(e);
   +                    }
   +
   +                    for (int j = 0; j < n; j++) {
   +                        long key = random.nextLong();
   +                        // Ensure keys are uniques
   +                        key -= key % (threadIdx + 1);
   +
   +                        String value = map.get(key);
   +                        if (value != null) {
   +                            assertEquals(values[(int) Math.abs(key % 
numValues)], value);
   +                        }
   +                    }
   +                }));
   +            //}
   +        }
   +
   +        System.out.println("Waiting for futures 2");
   +        count = 0;
   +        for (Future<?> future : futures) {
   +            future.get();
   +            count++;
   +            if (count % 1000 == 0) {
   +                System.out.println("Completed " + count + " futures out of 
" + futures.size());
   +            }
   +        }
   +        futures.clear();
   +
   +        executor.shutdown();
   +    }
   +
        @Test
        public void testIteration() {
            ConcurrentLongHashMap<String> map =
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to