Hi Jerry,

the issue occurs because Flink's storm compatibility layer does not support
custom configuration parameters currently.
There is this JIRA which aims to add the missing feature to Flink:
https://issues.apache.org/jira/browse/FLINK-2525
Maybe (but its unlikely) passing an empty Map in the
AbstractStormSpoutWrapper:

this.spout.open(null,
      StormWrapperSetupHelper
      .convertToTopologyContext((StreamingRuntimeContext)
super.getRuntimeContext(), true),
      new SpoutOutputCollector(this.collector));

would fix the issue. But I suspect that the KafkaSpout needs some
configuration parameters, so we have to wait for FLINK-2525.

Best,
Robert


On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <jerry.boyang.p...@gmail.com>
wrote:

> Hello,
>
> When I try to run a storm topology with a Kafka Spout on top of Flink, I
> get an NPE at:
>
> 15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
>       - Error closing stream operators after an exception.
>
> java.lang.NullPointerException
>
> at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)
>
> at
> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15:00:32,855 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend for state checkpoints is set to jobmanager.
>
> 15:00:32,855 INFO  org.apache.flink.runtime.taskmanager.Task
>       - event_deserializer (5/5) switched to RUNNING
>
> 15:00:32,859 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: ads (1/1) switched to FAILED with exception.
>
> java.lang.NullPointerException
>
> at java.util.HashMap.putMapEntries(HashMap.java:500)
>
> at java.util.HashMap.<init>(HashMap.java:489)
>
> at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)
>
> at
> org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> Has someone seen this before? or Have a fix?  I am using 0.10beta1 for all
> storm packages and a 0.10-snapshot (latest compiled) for all flink
> packages.  Sample of the kafka code I am using:
>
> Broker broker = new Broker("localhost", 9092);
> GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
> partitionInfo.addPartition(0, broker);
> StaticHosts hosts = new StaticHosts(partitionInfo);
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", 
> UUID.randomUUID().toString());
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> builder.setSpout("kafkaSpout", kafkaSpout, 1);
>
>

Reply via email to