Hi: I am using the following WikiEdit example: https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html
It works when printing the contents to a file or stdout. But I wanted to modify it to use Kinesis instead of Kafka. So instead of the Kafka part, I put: Properties producerConfig = new Properties(); producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); kinesis.setFailOnError(true); kinesis.setDefaultStream("my-flink-stream"); kinesis.setDefaultPartition("0"); result .map(new MapFunction<Tuple2<String,Long>, String>() { @Override public String map(Tuple2<String, Long> tuple) { return tuple.toString(); } }) .addSink(kinesis); see.execute(); But I get the following error: 2016-08-31 17:05:41,541 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to CANCELING 2016-08-31 17:05:41,542 INFO org.apache.flink.yarn.YarnJobManager - Status of job 43a13707d92da260827f37968597c187 () changed to FAILING. java.lang.Exception: Serialized representation of org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Searching Google doesn't yield many things that seem to work. Is there somewhere I should look for a root cause? I looked in the full log file but it's not much more than this stacktrace.