What is the suggested way to validate SQL?

2019-12-27 Thread Kaibo Zhou
Hi,

As a platform user, I want to integrate Flink SQL with the platform. The usage
scenario is:users register table/udf to catalog service, and then write SQL
scripts like: "insert into xxx select from xxx" through Web SQLEditor, the
platform need to validate the SQL script after each time the user changes
the SQL.

One problem I encountered is SQL validate depend on connector jar which
lead to many problem. More details can see the issue[1] I just submitted.

Another problem I found is when I use `tEnv.sqlUpdate("INSERT INTO
sinkTable SELECT f1,f2 FROM sourceTable");` to do SQL validation, I found
it NOT validate the sinkTable includes schema and table name.

I am confused what is the suggested way to validate a FLINK SQL? Maybe
Flink could provide a suggested way to let SQL be easily integrated by
external platforms.

[1]: https://issues.apache.org/jira/browse/FLINK-15419

Best,
Kaibo


Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-01 Thread Kaibo Zhou
Hi,
I encountered an error while running the kafka sink demo in IDEA.

This is the complete code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object kafka_test {

  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))

val config = env.getCheckpointConfig

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
config.setCheckpointInterval(15 * 1000)

val event = env.socketTextStream("localhost", )
val propsTarget = new Properties()
propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
propsTarget.setProperty("enable.idempotence", "true")

val outputProducer = new FlinkKafkaProducer011[String](
  "test-output",
  new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
  propsTarget,
  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to
Semantic.AT_LEAST_ONCE
)

event.addSink(outputProducer).name("sink_to_kafka")
env.execute()
  }
}

Start the command "nc -l " before running the above code.
Error message:

7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId :
73be1e1168f91ee2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  -
Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.clients.producer.internals.TransactionManager  -
[TransactionalId Source: Socket Stream -> Sink:
sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with
epoch -1
7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
producer with timeoutMillis = 9223372036854775807 ms.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink:
sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from
RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Source: Socket Stream -> Sink: sink_to_kafka (1/1)
(a7cea618f99152987bb4a52b4d1df0e3).
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka
(1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
and sending final execution state FAILED to JobManager for task Source:
Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket
Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3)
switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state
RUNNING to FAILING.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Discarding the
results produced by task execution a7cea618f99152987bb4a52b4d1df0e3.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Try to restart or
fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no
longer possible.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state
FA

Re: Streaming Exception

2017-03-10 Thread Kaibo Zhou
I think this is not the root cause of job failure, this task is caused by
other tasks failing. You can check the log of the first failed task.

2017-03-10 12:25 GMT+08:00 Govindarajan Srinivasaraghavan <
govindragh...@gmail.com>:

> Hi All,
>
> I see the below error after running my streaming job for a while and when
> the load increases. After a while the task manager becomes completely dead
> and the job keeps on restarting.
>
> Also when I checked if there is an back pressure in the UI, it kept on
> saying sampling in progress and no results were displayed. Is there an API
> which can provide the back pressure details?
>
> 2017-03-10 01:40:58,793 WARN  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator
>  - Error while emitting latency marker.
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.emitLatencyMarker(OperatorChain.java:426)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.emitLatencyMarker(
> AbstractStreamOperator.java:848)
> at org.apache.flink.streaming.api.operators.StreamSource$
> LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:152)
> at org.apache.flink.streaming.runtime.tasks.
> SystemProcessingTimeService$RepeatedTriggerTask.run(
> SystemProcessingTimeService.java:256)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> emitLatencyMarker(RecordWriterOutput.java:117)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.emitLatencyMarker(
> AbstractStreamOperator.java:848)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.reportOrForwardLatencyMarker(
> AbstractStreamOperator.java:708)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:
> 690)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.emitLatencyMarker(OperatorChain.java:423)
> ... 10 more
> Caused by: java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:138)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.sendToTarget(RecordWriter.java:132)
> at org.apache.flink.runtime.io.network.api.writer.
> RecordWriter.randomEmit(RecordWriter.java:107)
> at org.apache.flink.streaming.runtime.io.StreamRecordWriter.
> randomEmit(StreamRecordWriter.java:104)
> at org.apache.flink.streaming.runtime.io.RecordWriterOutput.
> emitLatencyMarker(RecordWriterOutput.java:114)
> ... 14 more
>
>
>