What is the suggested way to validate SQL?
Hi, As a platform user, I want to integrate Flink SQL with the platform. The usage scenario is:users register table/udf to catalog service, and then write SQL scripts like: "insert into xxx select from xxx" through Web SQLEditor, the platform need to validate the SQL script after each time the user changes the SQL. One problem I encountered is SQL validate depend on connector jar which lead to many problem. More details can see the issue[1] I just submitted. Another problem I found is when I use `tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable");` to do SQL validation, I found it NOT validate the sinkTable includes schema and table name. I am confused what is the suggested way to validate a FLINK SQL? Maybe Flink could provide a suggested way to let SQL be easily integrated by external platforms. [1]: https://issues.apache.org/jira/browse/FLINK-15419 Best, Kaibo
Problem when use kafka sink with EXACTLY_ONCE in IDEA
Hi, I encountered an error while running the kafka sink demo in IDEA. This is the complete code: import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper object kafka_test { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint")) val config = env.getCheckpointConfig config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) config.setCheckpointInterval(15 * 1000) val event = env.socketTextStream("localhost", ) val propsTarget = new Properties() propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092") propsTarget.setProperty("enable.idempotence", "true") val outputProducer = new FlinkKafkaProducer011[String]( "test-output", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), propsTarget, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to Semantic.AT_LEAST_ONCE ) event.addSink(outputProducer).name("sink_to_kafka") env.execute() } } Start the command "nc -l " before running the above code. Error message: 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-output 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Socket Stream -> Sink: sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with epoch -1 7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3). 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED] 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3. 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state RUNNING to FAILING. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution a7cea618f99152987bb4a52b4d1df0e3. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no longer possible. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state FA
Re: Streaming Exception
I think this is not the root cause of job failure, this task is caused by other tasks failing. You can check the log of the first failed task. 2017-03-10 12:25 GMT+08:00 Govindarajan Srinivasaraghavan < govindragh...@gmail.com>: > Hi All, > > I see the below error after running my streaming job for a while and when > the load increases. After a while the task manager becomes completely dead > and the job keeps on restarting. > > Also when I checked if there is an back pressure in the UI, it kept on > saying sampling in progress and no results were displayed. Is there an API > which can provide the back pressure details? > > 2017-03-10 01:40:58,793 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator > - Error while emitting latency marker. > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.emitLatencyMarker(OperatorChain.java:426) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.emitLatencyMarker( > AbstractStreamOperator.java:848) > at org.apache.flink.streaming.api.operators.StreamSource$ > LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:152) > at org.apache.flink.streaming.runtime.tasks. > SystemProcessingTimeService$RepeatedTriggerTask.run( > SystemProcessingTimeService.java:256) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset( > FutureTask.java:308) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > emitLatencyMarker(RecordWriterOutput.java:117) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator$CountingOutput.emitLatencyMarker( > AbstractStreamOperator.java:848) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.reportOrForwardLatencyMarker( > AbstractStreamOperator.java:708) > at org.apache.flink.streaming.api.operators. > AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java: > 690) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > ChainingOutput.emitLatencyMarker(OperatorChain.java:423) > ... 10 more > Caused by: java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at org.apache.flink.runtime.io.network.buffer. > LocalBufferPool.requestBuffer(LocalBufferPool.java:168) > at org.apache.flink.runtime.io.network.buffer.LocalBufferPool. > requestBufferBlocking(LocalBufferPool.java:138) > at org.apache.flink.runtime.io.network.api.writer. > RecordWriter.sendToTarget(RecordWriter.java:132) > at org.apache.flink.runtime.io.network.api.writer. > RecordWriter.randomEmit(RecordWriter.java:107) > at org.apache.flink.streaming.runtime.io.StreamRecordWriter. > randomEmit(StreamRecordWriter.java:104) > at org.apache.flink.streaming.runtime.io.RecordWriterOutput. > emitLatencyMarker(RecordWriterOutput.java:114) > ... 14 more > > >