This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3923643  Fix BookkeeperSchemaStorage NPE (#9264)
3923643 is described below

commit 3923643d36618c1c7f6b4d13219fbb8ddea31a73
Author: lipenghui <peng...@apache.org>
AuthorDate: Fri Feb 5 14:47:26 2021 +0800

    Fix BookkeeperSchemaStorage NPE (#9264)
    
    ### Motivation
    
    The NullPointerException is thrown when the zookeeper had an OOM issue. 
After we increase the zookeeper memory and restart the zookeeper cluster, the 
broker still kept throwing NullPointerException. The exception was fixed after 
rolling restart all brokers.
    
    ```
    07:54:13.142 [pulsar-io-25-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978] Subscribing on 
topic [topic] / [subscription]
    07:54:13.143 [Thread-241] WARN  org.apache.pulsar.broker.service.ServerCnx 
- [/10.0.168.5:42978][topic][subscription] Failed to create consumer: null
    java.util.concurrent.CompletionException: java.lang.NullPointerException
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1005)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$getSchema$6(BookkeeperSchemaStorage.java:175)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
 ~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.getSchema(BookkeeperSchemaStorage.java:169)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.get(BookkeeperSchemaStorage.java:126)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:95)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:81)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator.getSchema(SchemaRegistryServiceWithSchemaDataValidator.java:52)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.AbstractTopic.hasSchema(AbstractTopic.java:244)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.addSchemaIfIdleOrCheckCompatible(PersistentTopic.java:2144)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:920) 
~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:902)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
 ~[?:1.8.0_252]
        at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
~[?:1.8.0_252]
        at 
org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:852) 
~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:239)
 ~[org.apache.pulsar-pulsar-common-2.6.2.jar:2.6.2]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191) 
~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153)
 ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
 ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
 ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) 
~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
    Caused by: java.lang.NullPointerException
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.openLedger(BookkeeperSchemaStorage.java:565)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.readSchemaEntry(BookkeeperSchemaStorage.java:470)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$4(BookkeeperSchemaStorage.java:185)
 ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
 ~[?:1.8.0_252]
        ... 43 more
    ```
    
    The problem is the bookkeeper client used by the `BookkeeperSchemaStorage` 
does not create success due to the zookeeper issue. Currently, start the broker 
will create and start the SchemaStorage, but if the SchemaStorage start failed, 
the broker only prints a log `Unable to create schema registry storage`. after 
this moment, the broker will continue the start process, if the subsequent 
steps do not throw any exceptions, the broker will start successfully, however, 
the bookkeeper clien [...]
    
    ### Modifications
    
    Make sure the SchemaStorage start success when starting the broker, if 
SchemaStorage starts failed, the broker also should be start failed.
---
 .../java/org/apache/pulsar/broker/PulsarService.java    | 17 ++++++-----------
 .../pulsar/broker/service/schema/SchemaServiceTest.java | 10 ++++++++++
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f76ca45..aa893d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1040,17 +1040,12 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
-    private SchemaStorage createAndStartSchemaStorage() {
-        SchemaStorage schemaStorage = null;
-        try {
-            final Class<?> storageClass = 
Class.forName(config.getSchemaRegistryStorageClassName());
-            Object factoryInstance = storageClass.newInstance();
-            Method createMethod = storageClass.getMethod("create", 
PulsarService.class);
-            schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, this);
-            schemaStorage.start();
-        } catch (Exception e) {
-            LOG.warn("Unable to create schema registry storage");
-        }
+    private SchemaStorage createAndStartSchemaStorage() throws Exception {
+        final Class<?> storageClass = 
Class.forName(config.getSchemaRegistryStorageClassName());
+        Object factoryInstance = storageClass.newInstance();
+        Method createMethod = storageClass.getMethod("create", 
PulsarService.class);
+        SchemaStorage schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, this);
+        schemaStorage.start();
         return schemaStorage;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 2507f7d..6b2f192 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -37,10 +37,13 @@ import java.util.concurrent.ExecutionException;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -87,6 +90,7 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        
conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
         super.internalSetup();
         BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
         storage.init();
@@ -319,6 +323,12 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         putSchema(schemaId1, schemaData3, version(2), 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
     }
 
+    @Test(expectedExceptions = PulsarServerException.class)
+    public void testSchemaStorageFailed() throws Exception {
+        conf.setSchemaRegistryStorageClassName("Unknown class name");
+        restartBroker();
+    }
+
     private void putSchema(String schemaId, SchemaData schema, SchemaVersion 
expectedVersion) throws Exception {
         putSchema(schemaId, schema, expectedVersion, 
SchemaCompatibilityStrategy.FULL);
     }

Reply via email to