Re:Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 hb
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖.

在 2019-08-26 11:22:20,"Jark Wu"  写道:
>Hi,
>
>初步看下来你的 DDL 中有这几部分定义的有问题。
>
>1. 缺少format properties
>2. 缺少 connector.version
>3. bootstrap.severs 的配置方式写的不对...
>
>
>你可以参考下面这个作为example:
>
>
>CREATE TABLE kafka_json_source (
>rowtime TIMESTAMP,
>user_name VARCHAR,
>event ROW
>) WITH (
>'connector.type' = 'kafka',
>'connector.version' = 'universal',
>'connector.topic' = 'test-json',
>'connector.startup-mode' = 'earliest-offset',
>'connector.properties.0.key' = 'zookeeper.connect',
>'connector.properties.0.value' = 'localhost:2181',
>'connector.properties.1.key' = 'bootstrap.servers',
>'connector.properties.1.value' = 'localhost:9092',
>'update-mode' = 'append',
>'format.type' = 'json',
>'format.derive-schema' = 'true'
>);
>
>
>Kafka 中的数据长这个样子:
>
>{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
>"message_type": "WARNING", "message": "This is a warning."}}
>
>
>Best,
>Jark
>
>
>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
>> 
>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
>> 需要实现TableSourceFactory,还是其他什么.
>> 
>> 
>> 提示:  
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. findAndCreateTableSource failed.
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> 
>> 
>> 
>> 代码:
>> ```
>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>> import org.apache.flink.types.Row
>> 
>> 
>> object KafkaInDDL extends App {
>>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>>  val settings: EnvironmentSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>> settings)
>> 
>> 
>>  val sourceDDL =
>>"""create table sourceTable(
>>id int,
>>name varchar
>>  ) with (
>>'connector.type' = 'kafka',
>>'connector.property-version' = '1',
>>'update-mode' = 'append',
>>'bootstrap.servers' = '192.168.1.160:19092',
>>'connector.topic' = 'hbtest1',
>>'connector.startup-mode' = 'earliest-offset'
>>  )
>>"""
>>  tEnv.sqlUpdate(sourceDDL)
>>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>  tEnv.execute("")
>> }
>> ```
>


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 ddwcg
按照两位的方法修改后已经可以了,谢谢两位

> 在 2019年8月26日,12:28,Jark Wu  写道:
> 
> Hi,
> 
> 关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。
> 
> See release note for more details: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala
>  
> 
> 
>> 在 2019年8月26日,11:53,Zili Chen  写道:
>> 
>> 不应该呀,我看到仍然有 
>> 
>> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
>> Expression*): Unit
>> 
>> 这个方法的,你能提供完整一点的上下文和报错吗?
>> 
>> Best,
>> tison.
>> 
>> 
>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:38写道:
>> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
>> 总是感觉 java api 和scala api有点混乱了
>> 
>> 
>> 
>>> 在 2019年8月26日,11:22,Zili Chen >> > 写道:
>>> 
>>> 试试把
>>> 
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> 
>>> 换成
>>> 
>>> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>>> 
>>> 应该是意外 import 了不同包下的同名类的缘故
>>> 
>>> Best,
>>> tison.
>>> 
>>> 
>>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 
>>> 上午11:12写道:
>>> 
 大家好,
 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
 
 import org.apache.flink.streaming.api.CheckpointingMode
 import 
 org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
 import 
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.planner.expressions.StddevPop
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 
 object StreamingJob {
 def main(args: Array[String]) {
   val kafkaTopic = "source.kafka.topic"
   val jobName ="test"
   val parallelism =1
   val checkPointPath ="checkpoint/"
   val kafkaBrokers =""
 
   // set up the streaming execution environment
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(parallelism)
   env.enableCheckpointing(1)
   
 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   
 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //env.setStateBackend(new FsStateBackend(checkPointPath))
 
 
   val tableEnv = StreamTableEnvironment.create(env)
 
 
 提示有多个实现:
 
 下面是pom文件:
 
 
  org.apache.flink
  flink-scala_${scala.binary.version}
  ${flink.version}
  compile
 
 
  org.apache.flink
  flink-streaming-scala_${scala.binary.version}
  ${flink.version}
  compile
 
 
  org.apache.flink
  flink-table-planner-blink_${scala.binary.version}
  ${flink.version}
  provided
 
 
 
  org.apache.flink
  flink-table-runtime-blink_2.11
  1.9.0
 
 
  org.apache.flink
  flink-connector-kafka_2.11
  1.9.0
 
 
  org.apache.flink
  flink-table-common
  ${flink.version}
  provided
 
 
 
 
 
>> 
> 





kafka消费倾斜问题

2019-08-25 文章 zq wang
大家好,请教一个问题
我的程序以kafka为数据源 去重清洗后入kafka
sink,使用的是DataStreamAPI,on-yarn模式,flink版本1.8。程序消费三个topic,以List方式传入的如下

> public FlinkKafkaConsumer010(List topics, 
> KafkaDeserializationSchema deserializer, Properties props)
>
> 三个topic因为历史原因导致每个partition数据分配不均匀。我采用了

> DataStream> dataStream =
> env.addSource(initConsumer()).name("allTopic").rebalance(); //之后keyBy ->
> process etc.

 但是实际上我的subTask并没有均匀分配,如图:
[image: image.png]
请教下会是什么原因呢?谢谢


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 Jark Wu
Hi,

关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。

See release note for more details: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala
 


> 在 2019年8月26日,11:53,Zili Chen  写道:
> 
> 不应该呀,我看到仍然有 
> 
> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
> Expression*): Unit
> 
> 这个方法的,你能提供完整一点的上下文和报错吗?
> 
> Best,
> tison.
> 
> 
> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:38写道:
> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
> 总是感觉 java api 和scala api有点混乱了
> 
> 
> 
>> 在 2019年8月26日,11:22,Zili Chen > > 写道:
>> 
>> 试试把
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> 
>> 换成
>> 
>> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>> 
>> 应该是意外 import 了不同包下的同名类的缘故
>> 
>> Best,
>> tison.
>> 
>> 
>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:12写道:
>> 
>>> 大家好,
>>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>>> 
>>> import org.apache.flink.streaming.api.CheckpointingMode
>>> import 
>>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>>> import org.apache.flink.table.planner.expressions.StddevPop
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> 
>>> object StreamingJob {
>>>  def main(args: Array[String]) {
>>>val kafkaTopic = "source.kafka.topic"
>>>val jobName ="test"
>>>val parallelism =1
>>>val checkPointPath ="checkpoint/"
>>>val kafkaBrokers =""
>>> 
>>>// set up the streaming execution environment
>>>val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>env.setParallelism(parallelism)
>>>env.enableCheckpointing(1)
>>>
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>//env.setStateBackend(new FsStateBackend(checkPointPath))
>>> 
>>> 
>>>val tableEnv = StreamTableEnvironment.create(env)
>>> 
>>> 
>>> 提示有多个实现:
>>> 
>>> 下面是pom文件:
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-scala_${scala.binary.version}
>>>   ${flink.version}
>>>   compile
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-streaming-scala_${scala.binary.version}
>>>   ${flink.version}
>>>   compile
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-planner-blink_${scala.binary.version}
>>>   ${flink.version}
>>>   provided
>>> 
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-runtime-blink_2.11
>>>   1.9.0
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka_2.11
>>>   1.9.0
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-common
>>>   ${flink.version}
>>>   provided
>>> 
>>> 
>>> 
>>> 
>>> 
> 



Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 Jark Wu
Hi,

初步看下来你的 DDL 中有这几部分定义的有问题。

1. 缺少format properties
2. 缺少 connector.version
3. bootstrap.severs 的配置方式写的不对...


你可以参考下面这个作为example:


CREATE TABLE kafka_json_source (
rowtime TIMESTAMP,
user_name VARCHAR,
event ROW
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test-json',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);


Kafka 中的数据长这个样子:

{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
"message_type": "WARNING", "message": "This is a warning."}}


Best,
Jark


> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
> 
> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
> 需要实现TableSourceFactory,还是其他什么.
> 
> 
> 提示:  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. findAndCreateTableSource failed.
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> 
> 
> 
> 
> 代码:
> ```
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> import org.apache.flink.types.Row
> 
> 
> object KafkaInDDL extends App {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> settings)
> 
> 
>  val sourceDDL =
>"""create table sourceTable(
>id int,
>name varchar
>  ) with (
>'connector.type' = 'kafka',
>'connector.property-version' = '1',
>'update-mode' = 'append',
>'bootstrap.servers' = '192.168.1.160:19092',
>'connector.topic' = 'hbtest1',
>'connector.startup-mode' = 'earliest-offset'
>  )
>"""
>  tEnv.sqlUpdate(sourceDDL)
>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>  tEnv.execute("")
> }
> ```



Re: FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

2019-08-25 文章 Jark Wu
Hi Qi,

你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗?

另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。 

Best,
Jark

> 在 2019年8月23日,11:08,orlando qi  写道:
> 
> 
> at



Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 Zili Chen
不应该呀,我看到仍然有

def registerDataStream[T](name: String, dataStream: DataStream[T], fields:
Expression*): Unit

这个方法的,你能提供完整一点的上下文和报错吗?

Best,
tison.


ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:38写道:

> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
> 总是感觉 java api 和scala api有点混乱了
>
>
> 在 2019年8月26日,11:22,Zili Chen  写道:
>
> 试试把
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>
> 换成
>
> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>
> 应该是意外 import 了不同包下的同名类的缘故
>
> Best,
> tison.
>
>
> ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:
>
> 大家好,
> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>
> import org.apache.flink.streaming.api.CheckpointingMode
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.planner.expressions.StddevPop
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.producer.ProducerConfig
>
> object StreamingJob {
>  def main(args: Array[String]) {
>val kafkaTopic = "source.kafka.topic"
>val jobName ="test"
>val parallelism =1
>val checkPointPath ="checkpoint/"
>val kafkaBrokers =""
>
>// set up the streaming execution environment
>val env = StreamExecutionEnvironment.getExecutionEnvironment
>env.setParallelism(parallelism)
>env.enableCheckpointing(1)
>
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>//env.setStateBackend(new FsStateBackend(checkPointPath))
>
>
>val tableEnv = StreamTableEnvironment.create(env)
>
>
> 提示有多个实现:
>
> 下面是pom文件:
>
> 
>   org.apache.flink
>   flink-scala_${scala.binary.version}
>   ${flink.version}
>   compile
> 
> 
>   org.apache.flink
>   flink-streaming-scala_${scala.binary.version}
>   ${flink.version}
>   compile
> 
> 
>   org.apache.flink
>
>   flink-table-planner-blink_${scala.binary.version}
>   ${flink.version}
>   provided
> 
> 
> 
>   org.apache.flink
>   flink-table-runtime-blink_2.11
>   1.9.0
> 
> 
>   org.apache.flink
>   flink-connector-kafka_2.11
>   1.9.0
> 
> 
>   org.apache.flink
>   flink-table-common
>   ${flink.version}
>   provided
> 
>
>
>
>
>
>


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 ddwcg
感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
总是感觉 java api 和scala api有点混乱了



> 在 2019年8月26日,11:22,Zili Chen  写道:
> 
> 试试把
> 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> 
> 换成
> 
> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
> 
> 应该是意外 import 了不同包下的同名类的缘故
> 
> Best,
> tison.
> 
> 
> ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:
> 
>> 大家好,
>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>> 
>> import org.apache.flink.streaming.api.CheckpointingMode
>> import 
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>> import org.apache.flink.table.planner.expressions.StddevPop
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.producer.ProducerConfig
>> 
>> object StreamingJob {
>>  def main(args: Array[String]) {
>>val kafkaTopic = "source.kafka.topic"
>>val jobName ="test"
>>val parallelism =1
>>val checkPointPath ="checkpoint/"
>>val kafkaBrokers =""
>> 
>>// set up the streaming execution environment
>>val env = StreamExecutionEnvironment.getExecutionEnvironment
>>env.setParallelism(parallelism)
>>env.enableCheckpointing(1)
>>
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>//env.setStateBackend(new FsStateBackend(checkPointPath))
>> 
>> 
>>val tableEnv = StreamTableEnvironment.create(env)
>> 
>> 
>> 提示有多个实现:
>> 
>> 下面是pom文件:
>> 
>> 
>>   org.apache.flink
>>   flink-scala_${scala.binary.version}
>>   ${flink.version}
>>   compile
>> 
>> 
>>   org.apache.flink
>>   flink-streaming-scala_${scala.binary.version}
>>   ${flink.version}
>>   compile
>> 
>> 
>>   org.apache.flink
>>   flink-table-planner-blink_${scala.binary.version}
>>   ${flink.version}
>>   provided
>> 
>> 
>> 
>>   org.apache.flink
>>   flink-table-runtime-blink_2.11
>>   1.9.0
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka_2.11
>>   1.9.0
>> 
>> 
>>   org.apache.flink
>>   flink-table-common
>>   ${flink.version}
>>   provided
>> 
>> 
>> 
>> 
>> 



Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 文章 Zili Chen
试试把

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

换成

import org.apache.flink.table.api.scala.StreamExecutionEnvironment

应该是意外 import 了不同包下的同名类的缘故

Best,
tison.


ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:

> 大家好,
> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>
> import org.apache.flink.streaming.api.CheckpointingMode
> import 
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.planner.expressions.StddevPop
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.producer.ProducerConfig
>
> object StreamingJob {
>   def main(args: Array[String]) {
> val kafkaTopic = "source.kafka.topic"
> val jobName ="test"
> val parallelism =1
> val checkPointPath ="checkpoint/"
> val kafkaBrokers =""
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(parallelism)
> env.enableCheckpointing(1)
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> 
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> //env.setStateBackend(new FsStateBackend(checkPointPath))
>
>
> val tableEnv = StreamTableEnvironment.create(env)
>
>
> 提示有多个实现:
>
> 下面是pom文件:
>
> 
>org.apache.flink
>flink-scala_${scala.binary.version}
>${flink.version}
>compile
> 
> 
>org.apache.flink
>flink-streaming-scala_${scala.binary.version}
>${flink.version}
>compile
> 
> 
>org.apache.flink
>flink-table-planner-blink_${scala.binary.version}
>${flink.version}
>provided
> 
> 
> 
>org.apache.flink
>flink-table-runtime-blink_2.11
>1.9.0
> 
> 
>org.apache.flink
>flink-connector-kafka_2.11
>1.9.0
> 
> 
>org.apache.flink
>flink-table-common
>${flink.version}
>provided
> 
>
>
>
>


Re: 任务内存增长

2019-08-25 文章 Yun Tang
hi 张坤

使用的是RocksDBStateBackend么,一般被YARN的node manager内存超用而kill是native 内存超用导致的。可以在Flink 
参数env.java.opts.taskmanager里面加上 -XX:NativeMemoryTracking=detail 
[1],这样可以观察内存是否增长。另外你使用的内存配置和被kill时候的YARN的日志分别是什么呢,可以考虑增大JVM heap 
申请的资源来变相加大向YARN申请的总内存,某种程度上可以缓解被kill的概率。


[1] 
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html

祝好
唐云

From: 张坤 
Sent: Monday, August 26, 2019 10:45
To: user-zh@flink.apache.org 
Subject: 任务内存增长

Hi:

   最近在使用Flink(1.7.2)提交任务到yarn(per 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,

大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!



FLINK WEEKLY 2019/34

2019-08-25 文章 Zili Chen
很高兴和各位分享 FLINK 社区上周的发展。上周 FLINK 1.9.0[1]
正式发布了,本次发布的重大更新包括细粒度的恢复机制(FLIP-1)、State 处理 API(FLIP-43)、提供强一致性保证的
stop-with-savepoint(FLIP-43)以及利用 Angular 7.x 重写的 FLINK WebUI
等。此外,本次发布还包括了一系列正在开发中的供用户预览的特性,例如 Blink 的 SQL Query Processor,Hive 的整合,以及新的
Python Table API(FLIP-38)。欢迎大家下载 FLINK 1.9.0 尝试新功能!

同上次 WEEKLY[2]一样,FLINK WEEKLY 分为 USER、DEV 和 NEWS
三个部分,分别关注到用户问题的解答、社区开发的进展和社区的新闻。

[0] https://zhuanlan.zhihu.com/p/79781544
[1] https://flink.apache.org/news/2019/08/22/release-1.9.0.html
[2] https://zhuanlan.zhihu.com/p/78753149

@USER

[3] build Flink master brach fail due to npm

从源码 build FLINK 项目的时候,有时候会由于 npm 的问题(通常是网络问题)导致 build 速度慢或 build 失败,由于 npm
仅用于 build FLINK 的 WebUI,可以通过向 maven 传递参数 `-Dskip.npm` 来跳过 npm 的过程,减少 build
的时间。

[4] Flink Kafka Connector相关问题

FLINK 和 Kafka 协作时的一致性保证问题,相关的几个 offset 的定义和理解。

[5] flink1.9.0 LOCAL_WEBSERVER 问题

如果要在自己的项目中要使用 FLINK 的 WebUI,需要依赖 `filnk-runtime-web` 项目,大部分 REST 相关功能仅依赖
`flink-runtime` 但是小部分 REST 接口以及 Angular 开发的 WebUI 均依赖于 `flink-runtime-web`。

[6] processing avro data source using DataSet API and output to parquet

如何使用 FLINK 与 avro 和 parquet 协作?

[7] Using S3 as a sink (StreamingFileSink)

用户在将 S3 作为 `StreamingFileSink` 连接到 FLINK 之后发现无法从 savepoint 当中恢复作业,这可能与 S3
管理文件的策略有关。

[8] Issue with FilterableTableSource and the logical optimizer rules

FilterableTableSource 的使用过程中 CALCITE 引发的作业失败,社区 Committer 提供了一种 workaround
但是完整的修复还在进行中。

[9] Recovery from job manager crash using check points

FLINK 的 job graph store 和 checkpoint store 分别在 JobManager 和 TaskManager
挂掉的情况下提供高可用机制,根据实现的不同可能需要依赖 ZooKeeper 集群来存储元数据。

[10] Can I use watermarkers to have a global trigger of different
ProcessFunction's?

FLINK 中 watermark 的语义和正确的使用姿势。

[11] Externalized checkpoints

External checkpoint 使用过程中 retain 的策略和清理的策略设置。

[12] [SURVEY] How do you use high-availability services in Flink?

基于正在进行的 FLINK 高可用机制重构工作(FLINK-10333),我发起了一个用户使用 FLINK
的高可用机制的调查,欢迎参与到调查中介绍你的使用方式。

[3]
https://lists.apache.org/x/thread.html/3d983f5c49b88a316a2e13fdefa10548584c6e841270923033962dc0@%3Cuser-zh.flink.apache.org%3E
[4]
https://lists.apache.org/x/thread.html/b64e1cd6fc02239589fe3a316293b07ad47ab84f8f62b96b9198b8dc@%3Cuser-zh.flink.apache.org%3E
[5]
https://lists.apache.org/x/thread.html/2f6e5624079ecb09b18affc18ebf9dce2abba8ecb701657c84043e27@%3Cuser-zh.flink.apache.org%3E
[6]
https://lists.apache.org/x/thread.html/9349327ab7f130bcaca1b4c3515fcfc6b89b12ac2fac53540cc996df@%3Cuser.flink.apache.org%3E
[7]
https://lists.apache.org/x/thread.html/a23cb1b0247bb3d9206717bf99c735e11ffe3548fe58fdee8fb96ccc@%3Cuser.flink.apache.org%3E
[8]
https://lists.apache.org/x/thread.html/69dca869019f39c469907eb23f5dba02696d8fc1fd8ba86d870282e6@%3Cuser.flink.apache.org%3E
[9]
https://lists.apache.org/x/thread.html/598f3c6d92c316a78e28c8aefb6aa5a00ddea6cdf2dd2c937d635275@%3Cuser.flink.apache.org%3E
[10]
https://lists.apache.org/x/thread.html/3bebd6e6bb3a11eeb3bc5d5943b7bfce333b737cae3419ebab6490ea@%3Cuser.flink.apache.org%3E
[11]
https://lists.apache.org/x/thread.html/166f9e21411a1c3b8d749b9b51875f9ff7a1a497debd35603243144a@%3Cuser.flink.apache.org%3E
[12]
https://lists.apache.org/x/thread.html/c0cc07197e6ba30b45d7709cc9e17d8497e5e3f33de504d58dfcafad@%3Cuser.flink.apache.org%3E

@DEV

[13] [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

Timo Walther 发起了 FLIP-55 的讨论,旨在为 Table API 提供一个 Expression DSL 以方便用户编写程序。

[14] [DISCUSS] FLIP-56: Dynamic Slot Allocation

Xintong Song 发起了 FLIP-56 的讨论,旨在提供动态的 Slot 分配策略以更好的利用集群的资源。

[15] [DISCUSS] Upgrade kinesis connector to Apache 2.0 License and include
it in official release

更新 Kinesis 连接器的版本使得其 License 为 Apache 2.0 License,此后 FLINK 就可以在发布中直接包含
Kinesis 连接器。

[16] Support disk spilling in HeapKeyedStateBackend

Yu Li 提出的 FLIP-50: Spill-able Heap Keyed State Backend

进入开发阶段。

[17] [DISCUSS] Enhance Support for Multicast Communication Pattern

Yun Gao 发起了改进 FLINK 多播通信模式的讨论,这一改进旨在支持算子间更复杂的通信。

[18] CiBot Update

Chesnay Schepler 为 FLINK 的 pull request 机器人增加了发送 comment 重新跑测试的功能,这一功能在
ZooKeeper 等社区中也被广泛实现,此后 contributor 不用再通过发送空 commit 或关闭再打开 pull request
来触发重新跑测试。

[19] [DISCUSS] Use Java's Duration instead of Flink's Time

Stephan Ewen 发起了使用 Java 8 的 `Duration` 替代 FLINK 的 `Time` 的讨论,此前 FLINK
有两个简单实现的 `Time` 类来表示 runtime 中处理一段时间的概念,这经常引起开发者和用户的困惑。

[13]
https://lists.apache.org/x/thread.html/eb5e7b0579e5f1da1e9bf1ab4e4b86dba737946f0261d94d8c30521e@%3Cdev.flink.apache.org%3E
)
[14]
https://lists.apache.org/x/thread.html/72e5c211fb39ac1c596e12ae096d593ca30118dc12dcf664b7538624@%3Cdev.flink.apache.org%3E
[15]
https://lists.apache.org/x/thread.html/3876eec7aced42d2ac28728bc5084980ed7bf8ca6a6a8ed56e01e387@%3Cdev.flink.apache.org%3E
[16] https://issues.apache.org/jira/browse/FLINK-12692
[17]
https://lists.apache.org/x/thread.html/06834937769fda7c7afa4114e4f2f4ec84d95a54cc6ec46a5aa839de@%3Cdev.flink.apache.org%3E
[18]

任务内存增长

2019-08-25 文章 张坤
Hi:

   最近在使用Flink(1.7.2)提交任务到yarn(per 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,

大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!



flink1.9 blink planner table ddl 使用问题

2019-08-25 文章 hb
flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
需要实现TableSourceFactory,还是其他什么.


提示:  
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.




代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
settings)


  val sourceDDL =
"""create table sourceTable(
id int,
name varchar
  ) with (
'connector.type' = 'kafka',
'connector.property-version' = '1',
'update-mode' = 'append',
'bootstrap.servers' = '192.168.1.160:19092',
'connector.topic' = 'hbtest1',
'connector.startup-mode' = 'earliest-offset'
  )
"""
  tEnv.sqlUpdate(sourceDDL)
  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
  tEnv.execute("")
}
```