I searched in Flink (and hbase) for GeneratedMessageV3 but didn't find any

Which version of protobuf did you use to generate the class ?

Please copy user@ in the future so that more people can help.

>> Can you show how CustomerMessage is defined ?
>> Thanks
>>> Folks,
>>> I wrote a custom Data source to test me CEP logic. The custom data
>>> source looks like :
>>> public class CustomerDataSource extends 
>>> RichParallelSourceFunction<Customer> {
>>>     private boolean running = true;
>>>     private final Random random;
>>>     public CustomerDataSource() {
>>>         this.random = new Random();
>>>     }
>>>     @Override
>>>     public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>>>         while (running) {
>>>             new CustomerDataGen().generateMessages().
>>>                     forEach(element -> ctx.collect(element));
>>>             Thread.sleep(10000);
>>>         }
>>>     }
>>>     @Override
>>>     public void cancel() {
>>>         running = false;
>>>     }
>>> }
>>> public class CustomerDataGen {
>>>     public CustomerDataGen() {
>>>         this.random = new Random();
>>>     }
>>>     @Override
>>>     public List<CustomerMessage> generateMessages() throws 
>>> InterruptedException {
>>>         List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>>>         messages.add(getMessage());
>>>         return messages;
>>>     }
>>>     private CustomerMessage getMessage() {
>>>         Instant time = Instant.now();
>>>         Timestamp eventTimeStamp = 
>>> Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>>>         Timestamp creationTimeStamp = 
>>> Timestamp.newBuilder().setSeconds(time.getEpochSecond() 
>>> -1).setNanos(0).build();
>>>         return CustomerMessage.newBuilder().
>>>                 setName("SomeCustomer").
>>>                 setEventTimestamp(eventTimeStamp).
>>>                 setCustomerId("01234").
>>> addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>>>                 setEmail("custo...@foo.com").
>>>                 build();
>>>     }
>>> }
>>> In my Main program :
>>> .........
>>> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, 
>>> ProtobufSerializer.class);
>>> env.addSource(new CustomerDataSource());
>>> env.execute();
>>> When I run the program, I get the following exception :
>>> Caused by: java.lang.NullPointerException
>>>     at 
>>> shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
>>>     at java.util.AbstractList.add(AbstractList.java:108)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
>>>     at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>>     at 
>>> com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
>>>     at java.util.ArrayList.forEach(ArrayList.java:1249)
>>>     at 
>>> com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> I am having a tough time figuring out why. Can someone help me out as to 
>>> where am I going wrong?

