Can you show how CustomerMessage is defined ?

Thanks

On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <flinken...@gmail.com>
wrote:

> Folks,
>
> I wrote a custom Data source to test me CEP logic. The custom data source
> looks like :
>
> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
>     private boolean running = true;
>     private final Random random;
>
>     public CustomerDataSource() {
>         this.random = new Random();
>     }
>
>     @Override
>     public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>         while (running) {
>             new CustomerDataGen().generateMessages().
>                     forEach(element -> ctx.collect(element));
>
>             Thread.sleep(10000);
>         }
>     }
>
>     @Override
>     public void cancel() {
>         running = false;
>     }
> }
>
> public class CustomerDataGen {
>
>     public CustomerDataGen() {
>         this.random = new Random();
>     }
>     @Override
>     public List<CustomerMessage> generateMessages() throws 
> InterruptedException {
>         List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>
>         messages.add(getMessage());
>         return messages;
>     }
>
>     private CustomerMessage getMessage() {
>         Instant time = Instant.now();
>         Timestamp eventTimeStamp = 
> Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>         Timestamp creationTimeStamp = 
> Timestamp.newBuilder().setSeconds(time.getEpochSecond() 
> -1).setNanos(0).build();
>         return CustomerMessage.newBuilder().
>                 setName("SomeCustomer").
>                 setEventTimestamp(eventTimeStamp).
>                 setCustomerId("01234").
>                 addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>                 setEmail("custo...@foo.com").
>                 build();
>     }
> }
>
> In my Main program :
>
> .........
> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, 
> ProtobufSerializer.class);
> env.addSource(new CustomerDataSource());
>
> env.execute();
>
>
> When I run the program, I get the following exception :
>
>
> Caused by: java.lang.NullPointerException
>       at 
> shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
>       at java.util.AbstractList.add(AbstractList.java:108)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>       at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>       at 
> com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
>       at java.util.ArrayList.forEach(ArrayList.java:1249)
>       at 
> com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
>
> I am having a tough time figuring out why. Can someone help me out as to 
> where am I going wrong?
>
>
>

Reply via email to