Hi:
I am using the following WikiEdit example:
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

It works when printing the contents to a file or stdout.

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the 
Kafka part, I put:

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(new MapFunction<Tuple2<String,Long>, String>() {
    @Override
    public String map(Tuple2<String, Long> tuple) {
        return tuple.toString();
    }
})
.addSink(kinesis);

see.execute();


But I get the following error:

2016-08-31 17:05:41,541 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to 
CANCELING

2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager              
            - Status of job 43a13707d92da260827f37968597c187 () changed to 
FAILING.

java.lang.Exception: Serialized representation of 
org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.RuntimeException: Could not forward element to next operator

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)
Searching Google doesn't yield many things that seem to work. Is there 
somewhere I should look for a root cause? I looked in the full log file but 
it's not much more than this stacktrace.

Reply via email to