Re:Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 Thread hb
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖.

在 2019-08-26 11:22:20,"Jark Wu"  写道:
>Hi,
>
>初步看下来你的 DDL 中有这几部分定义的有问题。
>
>1. 缺少format properties
>2. 缺少 connector.version
>3. bootstrap.severs 的配置方式写的不对...
>
>
>你可以参考下面这个作为example:
>
>
>CREATE TABLE kafka_json_source (
>rowtime TIMESTAMP,
>user_name VARCHAR,
>event ROW
>) WITH (
>'connector.type' = 'kafka',
>'connector.version' = 'universal',
>'connector.topic' = 'test-json',
>'connector.startup-mode' = 'earliest-offset',
>'connector.properties.0.key' = 'zookeeper.connect',
>'connector.properties.0.value' = 'localhost:2181',
>'connector.properties.1.key' = 'bootstrap.servers',
>'connector.properties.1.value' = 'localhost:9092',
>'update-mode' = 'append',
>'format.type' = 'json',
>'format.derive-schema' = 'true'
>);
>
>
>Kafka 中的数据长这个样子:
>
>{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
>"message_type": "WARNING", "message": "This is a warning."}}
>
>
>Best,
>Jark
>
>
>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
>> 
>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
>> 需要实现TableSourceFactory,还是其他什么.
>> 
>> 
>> 提示:  
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. findAndCreateTableSource failed.
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> 
>> 
>> 
>> 代码:
>> ```
>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>> import org.apache.flink.types.Row
>> 
>> 
>> object KafkaInDDL extends App {
>>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>>  val settings: EnvironmentSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>> settings)
>> 
>> 
>>  val sourceDDL =
>>"""create table sourceTable(
>>id int,
>>name varchar
>>  ) with (
>>'connector.type' = 'kafka',
>>'connector.property-version' = '1',
>>'update-mode' = 'append',
>>'bootstrap.servers' = '192.168.1.160:19092',
>>'connector.topic' = 'hbtest1',
>>'connector.startup-mode' = 'earliest-offset'
>>  )
>>"""
>>  tEnv.sqlUpdate(sourceDDL)
>>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>  tEnv.execute("")
>> }
>> ```
>


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 Thread ddwcg
按照两位的方法修改后已经可以了,谢谢两位

> 在 2019年8月26日,12:28,Jark Wu  写道:
> 
> Hi,
> 
> 关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。
> 
> See release note for more details: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala
>  
> 
> 
>> 在 2019年8月26日,11:53,Zili Chen  写道:
>> 
>> 不应该呀,我看到仍然有 
>> 
>> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
>> Expression*): Unit
>> 
>> 这个方法的,你能提供完整一点的上下文和报错吗?
>> 
>> Best,
>> tison.
>> 
>> 
>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:38写道:
>> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
>> 总是感觉 java api 和scala api有点混乱了
>> 
>> 
>> 
>>> 在 2019年8月26日,11:22,Zili Chen >> > 写道:
>>> 
>>> 试试把
>>> 
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> 
>>> 换成
>>> 
>>> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>>> 
>>> 应该是意外 import 了不同包下的同名类的缘故
>>> 
>>> Best,
>>> tison.
>>> 
>>> 
>>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 
>>> 上午11:12写道:
>>> 
 大家好,
 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
 
 import org.apache.flink.streaming.api.CheckpointingMode
 import 
 org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
 import 
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.planner.expressions.StddevPop
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 
 object StreamingJob {
 def main(args: Array[String]) {
   val kafkaTopic = "source.kafka.topic"
   val jobName ="test"
   val parallelism =1
   val checkPointPath ="checkpoint/"
   val kafkaBrokers =""
 
   // set up the streaming execution environment
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(parallelism)
   env.enableCheckpointing(1)
   
 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   
 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //env.setStateBackend(new FsStateBackend(checkPointPath))
 
 
   val tableEnv = StreamTableEnvironment.create(env)
 
 
 提示有多个实现:
 
 下面是pom文件:
 
 
  org.apache.flink
  flink-scala_${scala.binary.version}
  ${flink.version}
  compile
 
 
  org.apache.flink
  flink-streaming-scala_${scala.binary.version}
  ${flink.version}
  compile
 
 
  org.apache.flink
  flink-table-planner-blink_${scala.binary.version}
  ${flink.version}
  provided
 
 
 
  org.apache.flink
  flink-table-runtime-blink_2.11
  1.9.0
 
 
  org.apache.flink
  flink-connector-kafka_2.11
  1.9.0
 
 
  org.apache.flink
  flink-table-common
  ${flink.version}
  provided
 
 
 
 
 
>> 
> 





Re: type error with generics ..

2019-08-25 Thread Debasish Ghosh
oh .. and I am using Flink 1.8 ..

On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh 
wrote:

> Thanks for the feedback .. here are the details ..
>
> Just to give u some background the original API is a Scala API as follows
> ..
>
> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
> CodecInlet[In]): DataStream[In] =
> context.readStream(inlet)
>
> and the *Scala version of the code runs fine* .. Here's the Java API
> (also written in Scala though but passing type information and
> deserialization schema explicitly and using the DataStream class from Flink
> Java) ..
>
> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
> context.readStream(inlet)(TypeInformation.of[In](clazz),
> deserializationSchema)
>   .javaStream
>
> Here's the Java code for transformation where I get the error ..
>
> DataStream ins =
>   this.readStream(in, Data.class, serdeData)
>   .map((Data d) -> d)
>   .returns(new TypeHint(){}.getTypeInfo());
>
> DataStream simples = ins.map((Data d) -> new Simple(d.getName()));
> // .returns(new TypeHint(){}.getTypeInfo());
> DataStreamSink sink = writeStream(out, simples, Simple.class,
> serdeSimple);
>
> Here's the corresponding Scala code that runs fine ..
>
> val ins: DataStream[Data] = readStream(in)
> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
> writeStream(out, simples)
>
> Here's the custom source that's also referred in the exception .. the case
> class is directly used in Scala while I use the Java API that uses that
> case class from Java ..
>
> object FlinkSource {
>   case class CollectionSourceFunction[T](data: Seq[T]) extends
> SourceFunction[T] {
> def cancel(): Unit = {}
> def run(ctx: SourceContext[T]): Unit = {
>   data.foreach(d ⇒ ctx.collect(d))
> }
>   }
>
>   /**
>* Java API
>*/
>   def collectionSourceFunction[T](data: java.util.List[T]) =
> CollectionSourceFunction(data.asScala.toSeq)
> }
>
> Here's how I use the custom source from Java .. (which gives exception) ..
> here data is a java.util.List
>
> env.addSource(
>   FlinkSource.collectionSourceFunction(data)
> )
>
> and here's the Scala version, which runs fine .. here data is a
> scala.Seq[Data]
>
> env.addSource(FlinkSource.CollectionSourceFunction(data))
>
> Here's the complete exception ..
>
> [info]   org.apache.flink.api.common.functions.InvalidTypesException: The
> return type of function 'Custom Source' could not be determined
> automatically, due to type erasure. You can give type information hints by
> using the returns(...) method on the result of the transformation call, or
> by letting your function implement the 'ResultTypeQueryable' interface.
> [info]   at
> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
> [info]   at
> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
> [info]   at
> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
> [info]   at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
> [info]   at
> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
> [info]   at
> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
> [info]   at
> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
> [info]   ...
> [info]   Cause:
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'T' in 'class
> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
> determined. This is most likely a type erasure problem. The type extraction
> currently supports types with generic variables only in cases where all
> variables in the return type can be deduced from the input type(s).
> Otherwise the type has to be specified explicitly using type information.
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)

kafka消费倾斜问题

2019-08-25 Thread zq wang
大家好,请教一个问题
我的程序以kafka为数据源 去重清洗后入kafka
sink,使用的是DataStreamAPI,on-yarn模式,flink版本1.8。程序消费三个topic,以List方式传入的如下

> public FlinkKafkaConsumer010(List topics, 
> KafkaDeserializationSchema deserializer, Properties props)
>
> 三个topic因为历史原因导致每个partition数据分配不均匀。我采用了

> DataStream> dataStream =
> env.addSource(initConsumer()).name("allTopic").rebalance(); //之后keyBy ->
> process etc.

 但是实际上我的subTask并没有均匀分配,如图:
[image: image.png]
请教下会是什么原因呢?谢谢


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 Thread Jark Wu
Hi,

关于 Expression的问题,你需要额外加入 import org.apache.flink.table.api._ 的 import。

See release note for more details: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/release-notes/flink-1.9.html#scala-expression-dsl-for-table-api-moved-to-flink-table-api-scala
 


> 在 2019年8月26日,11:53,Zili Chen  写道:
> 
> 不应该呀,我看到仍然有 
> 
> def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
> Expression*): Unit
> 
> 这个方法的,你能提供完整一点的上下文和报错吗?
> 
> Best,
> tison.
> 
> 
> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:38写道:
> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
> 总是感觉 java api 和scala api有点混乱了
> 
> 
> 
>> 在 2019年8月26日,11:22,Zili Chen > > 写道:
>> 
>> 试试把
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> 
>> 换成
>> 
>> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>> 
>> 应该是意外 import 了不同包下的同名类的缘故
>> 
>> Best,
>> tison.
>> 
>> 
>> ddwcg <3149768...@qq.com > 于2019年8月26日周一 上午11:12写道:
>> 
>>> 大家好,
>>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>>> 
>>> import org.apache.flink.streaming.api.CheckpointingMode
>>> import 
>>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>>> import org.apache.flink.table.planner.expressions.StddevPop
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.producer.ProducerConfig
>>> 
>>> object StreamingJob {
>>>  def main(args: Array[String]) {
>>>val kafkaTopic = "source.kafka.topic"
>>>val jobName ="test"
>>>val parallelism =1
>>>val checkPointPath ="checkpoint/"
>>>val kafkaBrokers =""
>>> 
>>>// set up the streaming execution environment
>>>val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>env.setParallelism(parallelism)
>>>env.enableCheckpointing(1)
>>>
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>//env.setStateBackend(new FsStateBackend(checkPointPath))
>>> 
>>> 
>>>val tableEnv = StreamTableEnvironment.create(env)
>>> 
>>> 
>>> 提示有多个实现:
>>> 
>>> 下面是pom文件:
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-scala_${scala.binary.version}
>>>   ${flink.version}
>>>   compile
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-streaming-scala_${scala.binary.version}
>>>   ${flink.version}
>>>   compile
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-planner-blink_${scala.binary.version}
>>>   ${flink.version}
>>>   provided
>>> 
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-runtime-blink_2.11
>>>   1.9.0
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka_2.11
>>>   1.9.0
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-table-common
>>>   ${flink.version}
>>>   provided
>>> 
>>> 
>>> 
>>> 
>>> 
> 



Re: flink1.9 blink planner table ddl 使用问题

2019-08-25 Thread Jark Wu
Hi,

初步看下来你的 DDL 中有这几部分定义的有问题。

1. 缺少format properties
2. 缺少 connector.version
3. bootstrap.severs 的配置方式写的不对...


你可以参考下面这个作为example:


CREATE TABLE kafka_json_source (
rowtime TIMESTAMP,
user_name VARCHAR,
event ROW
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test-json',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);


Kafka 中的数据长这个样子:

{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
"message_type": "WARNING", "message": "This is a warning."}}


Best,
Jark


> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
> 
> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
> 需要实现TableSourceFactory,还是其他什么.
> 
> 
> 提示:  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. findAndCreateTableSource failed.
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> 
> 
> 
> 
> 代码:
> ```
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> import org.apache.flink.types.Row
> 
> 
> object KafkaInDDL extends App {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> settings)
> 
> 
>  val sourceDDL =
>"""create table sourceTable(
>id int,
>name varchar
>  ) with (
>'connector.type' = 'kafka',
>'connector.property-version' = '1',
>'update-mode' = 'append',
>'bootstrap.servers' = '192.168.1.160:19092',
>'connector.topic' = 'hbtest1',
>'connector.startup-mode' = 'earliest-offset'
>  )
>"""
>  tEnv.sqlUpdate(sourceDDL)
>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>  tEnv.execute("")
> }
> ```



Re: FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列化不兼容问题( For heap backends, the new state serializer must not be incompatible)

2019-08-25 Thread Jark Wu
Hi Qi,

你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗?

另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。 

Best,
Jark

> 在 2019年8月23日,11:08,orlando qi  写道:
> 
> 
> at



Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 Thread Zili Chen
不应该呀,我看到仍然有

def registerDataStream[T](name: String, dataStream: DataStream[T], fields:
Expression*): Unit

这个方法的,你能提供完整一点的上下文和报错吗?

Best,
tison.


ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:38写道:

> 感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
> 总是感觉 java api 和scala api有点混乱了
>
>
> 在 2019年8月26日,11:22,Zili Chen  写道:
>
> 试试把
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>
> 换成
>
> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
>
> 应该是意外 import 了不同包下的同名类的缘故
>
> Best,
> tison.
>
>
> ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:
>
> 大家好,
> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>
> import org.apache.flink.streaming.api.CheckpointingMode
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.planner.expressions.StddevPop
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.producer.ProducerConfig
>
> object StreamingJob {
>  def main(args: Array[String]) {
>val kafkaTopic = "source.kafka.topic"
>val jobName ="test"
>val parallelism =1
>val checkPointPath ="checkpoint/"
>val kafkaBrokers =""
>
>// set up the streaming execution environment
>val env = StreamExecutionEnvironment.getExecutionEnvironment
>env.setParallelism(parallelism)
>env.enableCheckpointing(1)
>
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>//env.setStateBackend(new FsStateBackend(checkPointPath))
>
>
>val tableEnv = StreamTableEnvironment.create(env)
>
>
> 提示有多个实现:
>
> 下面是pom文件:
>
> 
>   org.apache.flink
>   flink-scala_${scala.binary.version}
>   ${flink.version}
>   compile
> 
> 
>   org.apache.flink
>   flink-streaming-scala_${scala.binary.version}
>   ${flink.version}
>   compile
> 
> 
>   org.apache.flink
>
>   flink-table-planner-blink_${scala.binary.version}
>   ${flink.version}
>   provided
> 
> 
> 
>   org.apache.flink
>   flink-table-runtime-blink_2.11
>   1.9.0
> 
> 
>   org.apache.flink
>   flink-connector-kafka_2.11
>   1.9.0
> 
> 
>   org.apache.flink
>   flink-table-common
>   ${flink.version}
>   provided
> 
>
>
>
>
>
>


Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 Thread ddwcg
感谢您的回复,确实是这个原因。但是后面注册表的时候不能使用 Expression
总是感觉 java api 和scala api有点混乱了



> 在 2019年8月26日,11:22,Zili Chen  写道:
> 
> 试试把
> 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> 
> 换成
> 
> import org.apache.flink.table.api.scala.StreamExecutionEnvironment
> 
> 应该是意外 import 了不同包下的同名类的缘故
> 
> Best,
> tison.
> 
> 
> ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:
> 
>> 大家好,
>> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>> 
>> import org.apache.flink.streaming.api.CheckpointingMode
>> import 
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.table.api.scala.StreamTableEnvironment
>> import org.apache.flink.table.planner.expressions.StddevPop
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.producer.ProducerConfig
>> 
>> object StreamingJob {
>>  def main(args: Array[String]) {
>>val kafkaTopic = "source.kafka.topic"
>>val jobName ="test"
>>val parallelism =1
>>val checkPointPath ="checkpoint/"
>>val kafkaBrokers =""
>> 
>>// set up the streaming execution environment
>>val env = StreamExecutionEnvironment.getExecutionEnvironment
>>env.setParallelism(parallelism)
>>env.enableCheckpointing(1)
>>
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>//env.setStateBackend(new FsStateBackend(checkPointPath))
>> 
>> 
>>val tableEnv = StreamTableEnvironment.create(env)
>> 
>> 
>> 提示有多个实现:
>> 
>> 下面是pom文件:
>> 
>> 
>>   org.apache.flink
>>   flink-scala_${scala.binary.version}
>>   ${flink.version}
>>   compile
>> 
>> 
>>   org.apache.flink
>>   flink-streaming-scala_${scala.binary.version}
>>   ${flink.version}
>>   compile
>> 
>> 
>>   org.apache.flink
>>   flink-table-planner-blink_${scala.binary.version}
>>   ${flink.version}
>>   provided
>> 
>> 
>> 
>>   org.apache.flink
>>   flink-table-runtime-blink_2.11
>>   1.9.0
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka_2.11
>>   1.9.0
>> 
>> 
>>   org.apache.flink
>>   flink-table-common
>>   ${flink.version}
>>   provided
>> 
>> 
>> 
>> 
>> 



Re: flink 1.9.0 StreamTableEnvironment 编译错误

2019-08-25 Thread Zili Chen
试试把

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

换成

import org.apache.flink.table.api.scala.StreamExecutionEnvironment

应该是意外 import 了不同包下的同名类的缘故

Best,
tison.


ddwcg <3149768...@qq.com> 于2019年8月26日周一 上午11:12写道:

> 大家好,
> 我周末升级到了1.9.0,但是在初始化table env的时候编译不通过,请大家帮忙看看是什么原因,
>
> import org.apache.flink.streaming.api.CheckpointingMode
> import 
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.table.planner.expressions.StddevPop
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients.producer.ProducerConfig
>
> object StreamingJob {
>   def main(args: Array[String]) {
> val kafkaTopic = "source.kafka.topic"
> val jobName ="test"
> val parallelism =1
> val checkPointPath ="checkpoint/"
> val kafkaBrokers =""
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(parallelism)
> env.enableCheckpointing(1)
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> 
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> //env.setStateBackend(new FsStateBackend(checkPointPath))
>
>
> val tableEnv = StreamTableEnvironment.create(env)
>
>
> 提示有多个实现:
>
> 下面是pom文件:
>
> 
>org.apache.flink
>flink-scala_${scala.binary.version}
>${flink.version}
>compile
> 
> 
>org.apache.flink
>flink-streaming-scala_${scala.binary.version}
>${flink.version}
>compile
> 
> 
>org.apache.flink
>flink-table-planner-blink_${scala.binary.version}
>${flink.version}
>provided
> 
> 
> 
>org.apache.flink
>flink-table-runtime-blink_2.11
>1.9.0
> 
> 
>org.apache.flink
>flink-connector-kafka_2.11
>1.9.0
> 
> 
>org.apache.flink
>flink-table-common
>${flink.version}
>provided
> 
>
>
>
>


Re: 任务内存增长

2019-08-25 Thread Yun Tang
hi 张坤

使用的是RocksDBStateBackend么,一般被YARN的node manager内存超用而kill是native 内存超用导致的。可以在Flink 
参数env.java.opts.taskmanager里面加上 -XX:NativeMemoryTracking=detail 
[1],这样可以观察内存是否增长。另外你使用的内存配置和被kill时候的YARN的日志分别是什么呢,可以考虑增大JVM heap 
申请的资源来变相加大向YARN申请的总内存,某种程度上可以缓解被kill的概率。


[1] 
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html

祝好
唐云

From: 张坤 
Sent: Monday, August 26, 2019 10:45
To: user-zh@flink.apache.org 
Subject: 任务内存增长

Hi:

   最近在使用Flink(1.7.2)提交任务到yarn(per 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,

大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!



FLINK WEEKLY 2019/34

2019-08-25 Thread Zili Chen
很高兴和各位分享 FLINK 社区上周的发展。上周 FLINK 1.9.0[1]
正式发布了,本次发布的重大更新包括细粒度的恢复机制(FLIP-1)、State 处理 API(FLIP-43)、提供强一致性保证的
stop-with-savepoint(FLIP-43)以及利用 Angular 7.x 重写的 FLINK WebUI
等。此外,本次发布还包括了一系列正在开发中的供用户预览的特性,例如 Blink 的 SQL Query Processor,Hive 的整合,以及新的
Python Table API(FLIP-38)。欢迎大家下载 FLINK 1.9.0 尝试新功能!

同上次 WEEKLY[2]一样,FLINK WEEKLY 分为 USER、DEV 和 NEWS
三个部分,分别关注到用户问题的解答、社区开发的进展和社区的新闻。

[0] https://zhuanlan.zhihu.com/p/79781544
[1] https://flink.apache.org/news/2019/08/22/release-1.9.0.html
[2] https://zhuanlan.zhihu.com/p/78753149

@USER

[3] build Flink master brach fail due to npm

从源码 build FLINK 项目的时候,有时候会由于 npm 的问题(通常是网络问题)导致 build 速度慢或 build 失败,由于 npm
仅用于 build FLINK 的 WebUI,可以通过向 maven 传递参数 `-Dskip.npm` 来跳过 npm 的过程,减少 build
的时间。

[4] Flink Kafka Connector相关问题

FLINK 和 Kafka 协作时的一致性保证问题,相关的几个 offset 的定义和理解。

[5] flink1.9.0 LOCAL_WEBSERVER 问题

如果要在自己的项目中要使用 FLINK 的 WebUI,需要依赖 `filnk-runtime-web` 项目,大部分 REST 相关功能仅依赖
`flink-runtime` 但是小部分 REST 接口以及 Angular 开发的 WebUI 均依赖于 `flink-runtime-web`。

[6] processing avro data source using DataSet API and output to parquet

如何使用 FLINK 与 avro 和 parquet 协作?

[7] Using S3 as a sink (StreamingFileSink)

用户在将 S3 作为 `StreamingFileSink` 连接到 FLINK 之后发现无法从 savepoint 当中恢复作业,这可能与 S3
管理文件的策略有关。

[8] Issue with FilterableTableSource and the logical optimizer rules

FilterableTableSource 的使用过程中 CALCITE 引发的作业失败,社区 Committer 提供了一种 workaround
但是完整的修复还在进行中。

[9] Recovery from job manager crash using check points

FLINK 的 job graph store 和 checkpoint store 分别在 JobManager 和 TaskManager
挂掉的情况下提供高可用机制,根据实现的不同可能需要依赖 ZooKeeper 集群来存储元数据。

[10] Can I use watermarkers to have a global trigger of different
ProcessFunction's?

FLINK 中 watermark 的语义和正确的使用姿势。

[11] Externalized checkpoints

External checkpoint 使用过程中 retain 的策略和清理的策略设置。

[12] [SURVEY] How do you use high-availability services in Flink?

基于正在进行的 FLINK 高可用机制重构工作(FLINK-10333),我发起了一个用户使用 FLINK
的高可用机制的调查,欢迎参与到调查中介绍你的使用方式。

[3]
https://lists.apache.org/x/thread.html/3d983f5c49b88a316a2e13fdefa10548584c6e841270923033962dc0@%3Cuser-zh.flink.apache.org%3E
[4]
https://lists.apache.org/x/thread.html/b64e1cd6fc02239589fe3a316293b07ad47ab84f8f62b96b9198b8dc@%3Cuser-zh.flink.apache.org%3E
[5]
https://lists.apache.org/x/thread.html/2f6e5624079ecb09b18affc18ebf9dce2abba8ecb701657c84043e27@%3Cuser-zh.flink.apache.org%3E
[6]
https://lists.apache.org/x/thread.html/9349327ab7f130bcaca1b4c3515fcfc6b89b12ac2fac53540cc996df@%3Cuser.flink.apache.org%3E
[7]
https://lists.apache.org/x/thread.html/a23cb1b0247bb3d9206717bf99c735e11ffe3548fe58fdee8fb96ccc@%3Cuser.flink.apache.org%3E
[8]
https://lists.apache.org/x/thread.html/69dca869019f39c469907eb23f5dba02696d8fc1fd8ba86d870282e6@%3Cuser.flink.apache.org%3E
[9]
https://lists.apache.org/x/thread.html/598f3c6d92c316a78e28c8aefb6aa5a00ddea6cdf2dd2c937d635275@%3Cuser.flink.apache.org%3E
[10]
https://lists.apache.org/x/thread.html/3bebd6e6bb3a11eeb3bc5d5943b7bfce333b737cae3419ebab6490ea@%3Cuser.flink.apache.org%3E
[11]
https://lists.apache.org/x/thread.html/166f9e21411a1c3b8d749b9b51875f9ff7a1a497debd35603243144a@%3Cuser.flink.apache.org%3E
[12]
https://lists.apache.org/x/thread.html/c0cc07197e6ba30b45d7709cc9e17d8497e5e3f33de504d58dfcafad@%3Cuser.flink.apache.org%3E

@DEV

[13] [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

Timo Walther 发起了 FLIP-55 的讨论,旨在为 Table API 提供一个 Expression DSL 以方便用户编写程序。

[14] [DISCUSS] FLIP-56: Dynamic Slot Allocation

Xintong Song 发起了 FLIP-56 的讨论,旨在提供动态的 Slot 分配策略以更好的利用集群的资源。

[15] [DISCUSS] Upgrade kinesis connector to Apache 2.0 License and include
it in official release

更新 Kinesis 连接器的版本使得其 License 为 Apache 2.0 License,此后 FLINK 就可以在发布中直接包含
Kinesis 连接器。

[16] Support disk spilling in HeapKeyedStateBackend

Yu Li 提出的 FLIP-50: Spill-able Heap Keyed State Backend

进入开发阶段。

[17] [DISCUSS] Enhance Support for Multicast Communication Pattern

Yun Gao 发起了改进 FLINK 多播通信模式的讨论,这一改进旨在支持算子间更复杂的通信。

[18] CiBot Update

Chesnay Schepler 为 FLINK 的 pull request 机器人增加了发送 comment 重新跑测试的功能,这一功能在
ZooKeeper 等社区中也被广泛实现,此后 contributor 不用再通过发送空 commit 或关闭再打开 pull request
来触发重新跑测试。

[19] [DISCUSS] Use Java's Duration instead of Flink's Time

Stephan Ewen 发起了使用 Java 8 的 `Duration` 替代 FLINK 的 `Time` 的讨论,此前 FLINK
有两个简单实现的 `Time` 类来表示 runtime 中处理一段时间的概念,这经常引起开发者和用户的困惑。

[13]
https://lists.apache.org/x/thread.html/eb5e7b0579e5f1da1e9bf1ab4e4b86dba737946f0261d94d8c30521e@%3Cdev.flink.apache.org%3E
)
[14]
https://lists.apache.org/x/thread.html/72e5c211fb39ac1c596e12ae096d593ca30118dc12dcf664b7538624@%3Cdev.flink.apache.org%3E
[15]
https://lists.apache.org/x/thread.html/3876eec7aced42d2ac28728bc5084980ed7bf8ca6a6a8ed56e01e387@%3Cdev.flink.apache.org%3E
[16] https://issues.apache.org/jira/browse/FLINK-12692
[17]
https://lists.apache.org/x/thread.html/06834937769fda7c7afa4114e4f2f4ec84d95a54cc6ec46a5aa839de@%3Cdev.flink.apache.org%3E
[18]

任务内存增长

2019-08-25 Thread 张坤
Hi:

   最近在使用Flink(1.7.2)提交任务到yarn(per 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,

大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!



flink1.9 blink planner table ddl 使用问题

2019-08-25 Thread hb
flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
需要实现TableSourceFactory,还是其他什么.


提示:  
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.




代码:
```
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.{EnvironmentSettings, Types}
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row


object KafkaInDDL extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
settings)


  val sourceDDL =
"""create table sourceTable(
id int,
name varchar
  ) with (
'connector.type' = 'kafka',
'connector.property-version' = '1',
'update-mode' = 'append',
'bootstrap.servers' = '192.168.1.160:19092',
'connector.topic' = 'hbtest1',
'connector.startup-mode' = 'earliest-offset'
  )
"""
  tEnv.sqlUpdate(sourceDDL)
  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
  tEnv.execute("")
}
```

Re: OVER operator filtering out records

2019-08-25 Thread Vinod Mehra
[image: image.png]

When there are new events the old events just get stuck for many hours
(more than a day). So if there is a buffering going on it seems it is not
time based but size based (?). Looks like unless the buffered events exceed
a certain threshold they don't get flushed out (?). Is that what is going
on? Can someone confirm? Is there a way to flush out periodically?

Thanks,
Vinod

On Fri, Aug 23, 2019 at 10:37 PM Vinod Mehra  wrote:

> Although things improved during bootstrapping and when even volume was
> larger. As soon as the traffic slowed down the events are getting stuck
> (buffered?) at the OVER operator for a very long time. Several hours.
>
> On Fri, Aug 23, 2019 at 5:04 PM Vinod Mehra  wrote:
>
>> (Forgot to mention that we are using Flink 1.4)
>>
>> Update: Earlier the OVER operator was assigned a parallelism of 64. I
>> reduced it to 1 and the problem went away! Now the OVER operator is not
>> filtering/buffering the events anymore.
>>
>> Can someone explain this please?
>>
>> Thanks,
>> Vinod
>>
>> On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra  wrote:
>>
>>> We have a SQL based flink job which is consume a very low volume stream
>>> (1 or 2 events in few hours):
>>>
>>>
>>>
>>>
>>>
>>>
>>> *SELECT user_id,COUNT(*) OVER (PARTITION BY user_id ORDER BY rowtime
>>> RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
>>> COALESCE(occurred_at, logged_at) AS latency_marker,rowtimeFROM
>>> event_fooWHERE user_id IS NOT NULL*
>>>
>>> The OVER operator seems to filter out events as per the flink dashboard
>>> (records received =  records sent = 0)
>>>
>>> The operator looks like this:
>>>
>>> *over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 259200
>>> PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
>>> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
>>> rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
>>> sample_without_formatter*
>>>
>>> I know that the OVER operator can discard late arriving events, but
>>> these events are not arriving late for sure. The watermark for all
>>> operators stay at 0 because the output events is 0.
>>>
>>> We have an exactly same SQL job against a high volume stream that is
>>> working fine. Watermarks progress in timely manner and events are delivered
>>> in timely manner as well.
>>>
>>> Any idea what could be going wrong? Are the events getting buffered
>>> waiting for certain number of events? If so, what is the threshold?
>>>
>>> Thanks,
>>> Vinod
>>>
>>


Re: type error with generics ..

2019-08-25 Thread Debasish Ghosh
Thanks for the feedback .. here are the details ..

Just to give u some background the original API is a Scala API as follows ..

final def readStream[In: TypeInformation: DeserializationSchema](inlet:
CodecInlet[In]): DataStream[In] =
context.readStream(inlet)

and the *Scala version of the code runs fine* .. Here's the Java API (also
written in Scala though but passing type information and deserialization
schema explicitly and using the DataStream class from Flink Java) ..

final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
context.readStream(inlet)(TypeInformation.of[In](clazz),
deserializationSchema)
  .javaStream

Here's the Java code for transformation where I get the error ..

DataStream ins =
  this.readStream(in, Data.class, serdeData)
  .map((Data d) -> d)
  .returns(new TypeHint(){}.getTypeInfo());

DataStream simples = ins.map((Data d) -> new Simple(d.getName()));
// .returns(new TypeHint(){}.getTypeInfo());
DataStreamSink sink = writeStream(out, simples, Simple.class,
serdeSimple);

Here's the corresponding Scala code that runs fine ..

val ins: DataStream[Data] = readStream(in)
val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
writeStream(out, simples)

Here's the custom source that's also referred in the exception .. the case
class is directly used in Scala while I use the Java API that uses that
case class from Java ..

object FlinkSource {
  case class CollectionSourceFunction[T](data: Seq[T]) extends
SourceFunction[T] {
def cancel(): Unit = {}
def run(ctx: SourceContext[T]): Unit = {
  data.foreach(d ⇒ ctx.collect(d))
}
  }

  /**
   * Java API
   */
  def collectionSourceFunction[T](data: java.util.List[T]) =
CollectionSourceFunction(data.asScala.toSeq)
}

Here's how I use the custom source from Java .. (which gives exception) ..
here data is a java.util.List

env.addSource(
  FlinkSource.collectionSourceFunction(data)
)

and here's the Scala version, which runs fine .. here data is a
scala.Seq[Data]

env.addSource(FlinkSource.CollectionSourceFunction(data))

Here's the complete exception ..

[info]   org.apache.flink.api.common.functions.InvalidTypesException: The
return type of function 'Custom Source' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or
by letting your function implement the 'ResultTypeQueryable' interface.
[info]   at
org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
[info]   at
org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
[info]   at
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
[info]   at
pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
[info]   at
pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
[info]   at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
[info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
[info]   at
pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
[info]   at pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
[info]   at
pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
[info]   ...
[info]   Cause:
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'T' in 'class
pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
[info]   at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
[info]   at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
[info]   at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
[info]   at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
[info]   at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
[info]   at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
[info]   at
pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
[info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at

Re: type error with generics ..

2019-08-25 Thread Rong Rong
I am not sure how the function `readStream` is implemented (also which
version of Flink are you using?).
Can you share more information on your code blocks and exception logs?

Also to answer your question, DataStream return type is determined by its
underlying transformation, so you cannot set it directly.

Thanks,
Rong

On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh 
wrote:

> Thanks .. I tried this ..
>
> DataStream ins = readStream(in, Data.class, serdeData).map((Data d)
> -> d).returns(new TypeHint(){}.getTypeInfo());
>
> But still get the same error on this line ..
>
> (BTW I am not sure how to invoke returns on a DataStream and hence had to
> do a fake map - any suggestions here ?)
>
> regards.
>
> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong  wrote:
>
>> Hi Debasish,
>>
>> I think the error refers to the output of your source instead of your
>> result of the map function. E.g.
>> DataStream ins = readStream(in, Data.class, serdeData)*.returns(new
>> TypeInformation);*
>> DataStream simples = ins.map((Data d) -> new Simple(d.getName()))
>> .returns(new TypeHint(){}.getTypeInfo());
>>
>> --
>> Rong
>>
>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh 
>> wrote:
>>
>>> Hello -
>>>
>>> I have the following call to addSource where I pass a Custom
>>> SourceFunction ..
>>>
>>> env.addSource(
>>>   new CollectionSourceFunctionJ(data, TypeInformation.of(new
>>> TypeHint(){}))
>>> )
>>>
>>> where data is List and CollectionSourceFunctionJ is a Scala case
>>> class ..
>>>
>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>> TypeInformation[T]) extends SourceFunction[T] {
>>>   def cancel(): Unit = {}
>>>   def run(ctx: SourceContext[T]): Unit = {
>>> data.asScala.foreach(d ⇒ ctx.collect(d))
>>>   }
>>> }
>>>
>>> When the following transformation runs ..
>>>
>>> DataStream ins = readStream(in, Data.class, serdeData);
>>> DataStream simples = ins.map((Data d) -> new
>>> Simple(d.getName())).returns(new TypeHint(){}.getTypeInfo());
>>>
>>> I get the following exception in the second line ..
>>>
>>> org.apache.flink.api.common.functions.InvalidTypesException: The return
 type of function 'Custom Source' could not be determined automatically, due
 to type erasure. You can give type information hints by using the
 returns(...) method on the result of the transformation call, or by letting
 your function implement the 'ResultTypeQueryable' interface.
>>>
>>>
>>> Initially the returns call was not there and I was getting the same
>>> exception. Now after adding the returns call, nothing changes.
>>>
>>> Any help will be appreciated ..
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


tumbling event time window , parallel

2019-08-25 Thread Hanan Yehudai
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?



Re: Using shell environment variables

2019-08-25 Thread Vishwas Siravara
You can also link at runtime by providing the path to the dylib by adding
-Djava.library.path= in jvm options in the task manager

On Sat, Aug 24, 2019 at 9:11 PM Zhu Zhu  wrote:

> Hi Abhishek,
>
> You need to export the environment variables on all the worker
> machines(not the machine to submit the job).
>
> Alternatively, if you are submitting the job to a yarn cluster, you can
> use flink conf prefix "containerized.taskmanager.env." to add environment
> variables to Flink's task manager process.
> For example for passing LD_LIBRARY_PATH as an env variable to the workers,
> set: containerized.taskmanager.env.LD_LIBRARY_PATH: "/usr/lib/native" in
> the flink-conf.yaml.
>
> Thanks,
> Zhu Zhu
>
> Abhishek Jain  于2019年8月25日周日 上午2:48写道:
>
>> Hi Miki,
>> Thanks for your reply. ParameterTool will only help in making the value
>> accessible through ParameterTool.get(). However, I need a way of accessing
>> the value using "System.getenv" since the underlying library uses it so.
>>
>> On Sat, 24 Aug 2019 at 23:04, miki haiat  wrote:
>>
>>> Did you register your system environment parameter ?
>>>
>>> You can find here several ways to use configuration data [1]
>>>
>>> 1.
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html
>>>
>>>
>>> On Sat, Aug 24, 2019, 20:26 Abhishek Jain  wrote:
>>>
 Hi!

 I am using a library that depends on a certain environment variable set
 (mandatorily). Now, I've exported this variable in my environment but
 somehow it's not being read by the task manager. Following is the exception
 I get when I try to run the job:

 Caused by: com.example.MyCustomException: Env token is null
 at com.example.AerospikeSink.open(AerospikeSink.java:47)
 at
 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at
 org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:745)

 Here's the code that throws this exception:

 @Override
 public void open(Configuration config) throws Exception {
 if (System.getenv("API_TOKEN") == null) {
 throw new MyCustomException("Env token is null");
 }
 }

 My question: Is there an alternative to System.getenv() that I can use
 to access environment variables inside of flink task?

 ( P.S. I've only copied relevant code snippet to avoid confusion. I do
 intend to use API_TOKEN later on. )

 --
 Warm Regards,
 Abhishek Jain

>>>
>>
>> --
>> Warm Regards,
>> Abhishek Jain
>>
>


Error while using catalog in .yaml file

2019-08-25 Thread Yebgenya Lazarkhosrouabadi
Hello,

I'm trying to use hivecatalog in flink1.9. I modified the yaml file like this:


catalogs:
  - name: mynewhive
type: hive
hive-conf-dir: /home/user/Downloads/apache-hive-1.2.2-bin/conf
default-database: myhive


But when I try to run ./sql-client.sh embedded  I get this error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
The configured environment is invalid. Please check your environment files 
again.
   at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
   at 
org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
   at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:553)
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:373)
   at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
   ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
default-database=myhive
hive-conf-dir=/home/bernadette/Downloads/apache-hive-1.2.2-bin/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
   at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
   at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
   at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
   at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:258)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:136)
   at java.util.HashMap.forEach(HashMap.java:1289)
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:135)
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:549)
   ... 4 more



How can I get rid of this error?

Best regards
Yebgenya Lazar
HINWEIS: Dies ist eine vertrauliche Nachricht und nur f?r den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zug?nglich zu machen. Sollten Sie diese Nachricht irrt?mlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.


Re: Use logback instead of log4j

2019-08-25 Thread Vishwas Siravara
Any idea on how I can use log back instead ?

On Fri, Aug 23, 2019 at 1:22 PM Vishwas Siravara 
wrote:

> Hi ,
> From the flink doc , in order to use logback instead of log4j " Users
> willing to use logback instead of log4j can just exclude log4j (or delete
> it from the lib/ folder)."
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html
>  .
>
> However when i delete it from the lib and start the cluster , there are no
> logs generated , instead I see console log which says "Failed to
> instantiate SLF4J LoggerFactory"
>
> Reported exception:
> java.lang.NoClassDefFoundError: org/apache/log4j/Level
> at org.slf4j.LoggerFactory.bind(LoggerFactory.java:143)
> at 
> org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:122)
> at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:378)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:328)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.(ClusterEntrypoint.java:98)
>
>
> How can I use logback instead ?
>
>
> Thanks,
> Vishwas
>
>


Re: Externalized checkpoints

2019-08-25 Thread Vishwas Siravara
Got it.Thank you

On Thu, Aug 22, 2019 at 8:54 PM Congxian Qiu  wrote:

> Hi, Vishwas
>
> As Zhu Zhu said, you can set "state.checkpoints.num-retained"[1] to
> specify the maximum number of completed checkpoints to retain.
> maybe you can also ref the external checkpoint cleanup type[2] config for
> how to clean up the retained checkpoint[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/checkpointing.html#related-config-options
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#retained-checkpoints
> Best,
> Congxian
>
>
> Zhu Zhu  于2019年8月22日周四 上午10:13写道:
>
>> Hi Vishwas,
>>
>> You can configure "state.checkpoints.num-retained" to specify the max
>> checkpoints to retain.
>> By default it is 1.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Vishwas Siravara  于2019年8月22日周四 上午6:48写道:
>>
>>> I am also using exactly once checkpointing mode, I have a kafka source
>>> and sink so both support transactions which should allow for exactly once
>>> processing. Is this the reason why there is only one checkpoint retained ?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
>>> wrote:
>>>
 Hi peeps,
 I am externalizing checkpoints in S3 for my flink job and I retain them
 on cancellation. However when I look into my S3 bucket where the
 checkpoints are stored there is only 1 checkpoint at any point in time . Is
 this the default behavior of flink where older checkpoints are deleted when
 the current checkpoint completes ? Here are a few screenshots. What are
 your thoughts on restoring an older state which is not the previous state ?

 List contents of bucket at time 0

 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
  modified time : Wed Aug 21 22:17:23 GMT 2019
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
 time : Wed Aug 21 22:17:24 GMT 2019

 List contents of bucket at time 1

 Printing last modified times
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
  modified time : Wed Aug 21 22:23:24 GMT 2019
 Object Name: 
 checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
 time : Wed Aug 21 22:23:24 GMT 2019

 Thanks,

 Vishwas




RE: timeout error while connecting to Kafka

2019-08-25 Thread Eyal Pe'er
Nope, I submitted it throw the flink job master itself by running flink run -c 
  sandbox.jar

Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B62.D5E98030]

From: miki haiat 
Sent: Sunday, August 25, 2019 4:21 PM
To: Eyal Pe'er 
Cc: user 
Subject: Re: timeout error while connecting to Kafka

I'm trying to understand.
Did you submitted your jar throw the flink web UI ,
And then you got the time out error ?

On Sun, Aug 25, 2019, 16:10 Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
What do you mean by “remote cluster”?
I tried to run dockerized Flink version 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html)
 on a remote machine and to submit a job that supposed to communicate with 
Kafka, but still I cannot access the topic.


Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Did you try to submit it to  remote cluster ?


On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
BTW, the exception that I see in the log is: ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception 
occurred in REST handler…
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat mailto:miko5...@gmail.com>>
Cc: user@flink.apache.org
Subject: RE: timeout error while connecting to Kafka

Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used 
it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to 
connect zookeeper, instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for 
Kafka 0.8, but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 
org.apache.flink
flink-connector-kafka_2.11
1.7.0



Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic 
metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?


org.apache.flink
flink-core
1.5.0


org.apache.flink
flink-connector-kafka-0.11_2.11
1.5.0


org.apache.flink
flink-streaming-java_2.11
1.5.0


org.apache.flink
flink-java
1.5.0


org.apache.flink
flink-clients_2.10
1.1.4


org.apache.flink
flink-connector-kafka_2.11
1.7.0



Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and 
print it to the console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 

Re: timeout error while connecting to Kafka

2019-08-25 Thread miki haiat
I'm trying to understand.
Did you submitted your jar throw the flink web UI ,
And then you got the time out error ?

On Sun, Aug 25, 2019, 16:10 Eyal Pe'er  wrote:

> What do you mean by “remote cluster”?
>
> I tried to run dockerized Flink version (
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html)
> on a remote machine and to submit a job that supposed to communicate with
> Kafka, but still I cannot access the topic.
>
>
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Sunday, August 25, 2019 3:50 PM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Did you try to submit it to  remote cluster ?
>
>
>
>
>
> On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er  wrote:
>
> BTW, the exception that I see in the log is: ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
> occurred in REST handler…
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* Eyal Pe'er 
> *Sent:* Sunday, August 25, 2019 2:20 PM
> *To:* miki haiat 
> *Cc:* user@flink.apache.org
> *Subject:* RE: timeout error while connecting to Kafka
>
>
>
> Hi,
>
> I removed that dependency, but it still fails.
>
> The reason why I used Kafka 1.5.0 is because I followed a training which
> used it (https://www.baeldung.com/kafka-flink-data-pipeline).
>
> If needed, I can change it.
>
>
>
> I’m not sure, but maybe in order to consume events from Kafka 0.9 I need
> to connect zookeeper, instead of the bootstrap servers ?
>
> I know that in Spark streaming we consume via zookeeper
> ("zookeeper.connect").
>
> I saw that in Apache Flink-Kafka connector zookeeper.connect  only
> required for Kafka 0.8, but maybe I still need to use it ?
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 2:29 PM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you try to remove this from your pom file .
>
>  
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Is their any reason why you are using flink 1.5 and not latest release.
>
>
>
>
>
> Best,
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er  wrote:
>
> Hi Miki,
>
> First, I would like to thank you for the fast response.
>
> I recheck Kafka and it is up and running fine.
>
> I’m still getting the same error (Timeout expired while fetching topic
> metadata).
>
> Maybe my Flink version is wrong (Kafka version is 0.9)?
>
>
>
> 
>
> org.apache.flink
>
> flink-core
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka-0.11_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-streaming-java_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-java
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-clients_2.10
>
> 1.1.4
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 11:03 AM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you double check that the kafka instance is up ?
> The code looks fine.
>
>
>
>
>
> Best,
>
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
> wrote:
>
> Hi,
>
> I'm trying to consume events using Apache Flink.
>
> The code is very basic, trying to connect the topic split words by space
> and print it to the console. Kafka version is 0.9.
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
>
>
> public class KafkaStreaming {
>
>
>
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
>
>
> Properties props = new Properties();
>
> props.setProperty("bootstrap.servers", "kafka servers:9092...");
>
> props.setProperty("group.id", "flinkPOC");
>
> FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
> "topic", new SimpleStringSchema(), props);
>
>
>
> DataStream dataStream = env.addSource(consumer);
>
>
>
> 

RE: timeout error while connecting to Kafka

2019-08-25 Thread Eyal Pe'er
What do you mean by “remote cluster”?
I tried to run dockerized Flink version 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html)
 on a remote machine and to submit a job that supposed to communicate with 
Kafka, but still I cannot access the topic.


Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat 
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er 
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Did you try to submit it to  remote cluster ?


On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
BTW, the exception that I see in the log is: ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception 
occurred in REST handler…
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat mailto:miko5...@gmail.com>>
Cc: user@flink.apache.org
Subject: RE: timeout error while connecting to Kafka

Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used 
it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to 
connect zookeeper, instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for 
Kafka 0.8, but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 
org.apache.flink
flink-connector-kafka_2.11
1.7.0



Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic 
metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?


org.apache.flink
flink-core
1.5.0


org.apache.flink
flink-connector-kafka-0.11_2.11
1.5.0


org.apache.flink
flink-streaming-java_2.11
1.5.0


org.apache.flink
flink-java
1.5.0


org.apache.flink
flink-clients_2.10
1.1.4


org.apache.flink
flink-connector-kafka_2.11
1.7.0



Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B5F.A935BE30]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and 
print it to the console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka servers:9092...");
props.setProperty("group.id", "flinkPOC");
FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>("topic", 
new SimpleStringSchema(), props);

DataStream dataStream = env.addSource(consumer);

DataStream wordDataStream = dataStream.flatMap(new Splitter());
wordDataStream.print();
env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction {

public void 

Re: timeout error while connecting to Kafka

2019-08-25 Thread miki haiat
Did you try to submit it to  remote cluster ?


On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er  wrote:

> BTW, the exception that I see in the log is: ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
> occurred in REST handler…
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* Eyal Pe'er 
> *Sent:* Sunday, August 25, 2019 2:20 PM
> *To:* miki haiat 
> *Cc:* user@flink.apache.org
> *Subject:* RE: timeout error while connecting to Kafka
>
>
>
> Hi,
>
> I removed that dependency, but it still fails.
>
> The reason why I used Kafka 1.5.0 is because I followed a training which
> used it (https://www.baeldung.com/kafka-flink-data-pipeline).
>
> If needed, I can change it.
>
>
>
> I’m not sure, but maybe in order to consume events from Kafka 0.9 I need
> to connect zookeeper, instead of the bootstrap servers ?
>
> I know that in Spark streaming we consume via zookeeper
> ("zookeeper.connect").
>
> I saw that in Apache Flink-Kafka connector zookeeper.connect  only
> required for Kafka 0.8, but maybe I still need to use it ?
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 2:29 PM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you try to remove this from your pom file .
>
>  
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Is their any reason why you are using flink 1.5 and not latest release.
>
>
>
>
>
> Best,
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er  wrote:
>
> Hi Miki,
>
> First, I would like to thank you for the fast response.
>
> I recheck Kafka and it is up and running fine.
>
> I’m still getting the same error (Timeout expired while fetching topic
> metadata).
>
> Maybe my Flink version is wrong (Kafka version is 0.9)?
>
>
>
> 
>
> org.apache.flink
>
> flink-core
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka-0.11_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-streaming-java_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-java
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-clients_2.10
>
> 1.1.4
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 11:03 AM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you double check that the kafka instance is up ?
> The code looks fine.
>
>
>
>
>
> Best,
>
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
> wrote:
>
> Hi,
>
> I'm trying to consume events using Apache Flink.
>
> The code is very basic, trying to connect the topic split words by space
> and print it to the console. Kafka version is 0.9.
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
>
>
> public class KafkaStreaming {
>
>
>
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
>
>
> Properties props = new Properties();
>
> props.setProperty("bootstrap.servers", "kafka servers:9092...");
>
> props.setProperty("group.id", "flinkPOC");
>
> FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
> "topic", new SimpleStringSchema(), props);
>
>
>
> DataStream dataStream = env.addSource(consumer);
>
>
>
> DataStream wordDataStream = dataStream.flatMap(new Splitter
> ());
>
> wordDataStream.print();
>
> env.execute("Word Split");
>
>
>
> }
>
>
>
> public static class Splitter implements FlatMapFunction {
>
>
>
> public void flatMap(String sentence, Collector out) throws
> Exception {
>
>
>
> for (String word : sentence.split(" ")) {
>
> out.collect(word);
>
> }
>
> }
>
>
>
> }
>
> }
>
>
>
> The app does not print anything to the screen (although I produced events
> to Kafka).
>
> I tried to skip the Splitter FlatMap function, but still nothing happens.
> SSL or any kind of authentication is not required from Kafka.
>
> This is the error that I found in the logs:
>
> 

RE: timeout error while connecting to Kafka

2019-08-25 Thread Eyal Pe'er
Replication factor is 1. In most of my topics this is the case.
Is it a problem to consume events from non-replicated topics ?

Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B59.352FCE00]

From: Yitzchak Lieberman 
Sent: Sunday, August 25, 2019 3:13 PM
To: Eyal Pe'er 
Cc: miki haiat ; user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

What is the topic replication factor? how many kafka brokers do you have?
I were facing the same exception when one of my brokers was down and the topic 
had no replica (replication_factor=1)

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
BTW, the exception that I see in the log is: ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception 
occurred in REST handler…
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B59.352FCE00]

From: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat mailto:miko5...@gmail.com>>
Cc: user@flink.apache.org
Subject: RE: timeout error while connecting to Kafka

Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used 
it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to 
connect zookeeper, instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for 
Kafka 0.8, but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B59.352FCE00]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 
org.apache.flink
flink-connector-kafka_2.11
1.7.0



Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic 
metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?


org.apache.flink
flink-core
1.5.0


org.apache.flink
flink-connector-kafka-0.11_2.11
1.5.0


org.apache.flink
flink-streaming-java_2.11
1.5.0


org.apache.flink
flink-java
1.5.0


org.apache.flink
flink-clients_2.10
1.1.4


org.apache.flink
flink-connector-kafka_2.11
1.7.0



Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B59.352FCE00]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and 
print it to the console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka servers:9092...");
props.setProperty("group.id", "flinkPOC");
FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>("topic", 
new SimpleStringSchema(), props);

DataStream dataStream = env.addSource(consumer);

DataStream wordDataStream = dataStream.flatMap(new Splitter());
wordDataStream.print();
env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction {

public 

Re: timeout error while connecting to Kafka

2019-08-25 Thread Yitzchak Lieberman
What is the topic replication factor? how many kafka brokers do you have?
I were facing the same exception when one of my brokers was down and the
topic had no replica (replication_factor=1)

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er  wrote:

> BTW, the exception that I see in the log is: ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception
> occurred in REST handler…
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* Eyal Pe'er 
> *Sent:* Sunday, August 25, 2019 2:20 PM
> *To:* miki haiat 
> *Cc:* user@flink.apache.org
> *Subject:* RE: timeout error while connecting to Kafka
>
>
>
> Hi,
>
> I removed that dependency, but it still fails.
>
> The reason why I used Kafka 1.5.0 is because I followed a training which
> used it (https://www.baeldung.com/kafka-flink-data-pipeline).
>
> If needed, I can change it.
>
>
>
> I’m not sure, but maybe in order to consume events from Kafka 0.9 I need
> to connect zookeeper, instead of the bootstrap servers ?
>
> I know that in Spark streaming we consume via zookeeper
> ("zookeeper.connect").
>
> I saw that in Apache Flink-Kafka connector zookeeper.connect  only
> required for Kafka 0.8, but maybe I still need to use it ?
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 2:29 PM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you try to remove this from your pom file .
>
>  
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Is their any reason why you are using flink 1.5 and not latest release.
>
>
>
>
>
> Best,
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er  wrote:
>
> Hi Miki,
>
> First, I would like to thank you for the fast response.
>
> I recheck Kafka and it is up and running fine.
>
> I’m still getting the same error (Timeout expired while fetching topic
> metadata).
>
> Maybe my Flink version is wrong (Kafka version is 0.9)?
>
>
>
> 
>
> org.apache.flink
>
> flink-core
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka-0.11_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-streaming-java_2.11
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-java
>
> 1.5.0
>
> 
>
> 
>
> org.apache.flink
>
> flink-clients_2.10
>
> 1.1.4
>
> 
>
> 
>
> org.apache.flink
>
> flink-connector-kafka_2.11
>
> 1.7.0
>
> 
>
>
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>
> *From:* miki haiat 
> *Sent:* Thursday, August 22, 2019 11:03 AM
> *To:* Eyal Pe'er 
> *Cc:* user@flink.apache.org
> *Subject:* Re: timeout error while connecting to Kafka
>
>
>
> Can you double check that the kafka instance is up ?
> The code looks fine.
>
>
>
>
>
> Best,
>
>
>
> Miki
>
>
>
> On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
> wrote:
>
> Hi,
>
> I'm trying to consume events using Apache Flink.
>
> The code is very basic, trying to connect the topic split words by space
> and print it to the console. Kafka version is 0.9.
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
>
>
> public class KafkaStreaming {
>
>
>
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
>
>
> Properties props = new Properties();
>
> props.setProperty("bootstrap.servers", "kafka servers:9092...");
>
> props.setProperty("group.id", "flinkPOC");
>
> FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
> "topic", new SimpleStringSchema(), props);
>
>
>
> DataStream dataStream = env.addSource(consumer);
>
>
>
> DataStream wordDataStream = dataStream.flatMap(new Splitter
> ());
>
> wordDataStream.print();
>
> env.execute("Word Split");
>
>
>
> }
>
>
>
> public static class Splitter implements FlatMapFunction {
>
>
>
> public void flatMap(String sentence, Collector out) throws
> Exception {
>
>
>
> for (String word : sentence.split(" ")) {
>
> out.collect(word);
>
> }
>
> }
>
>
>
> }
>
> }
>
>
>
> The app does not print anything to the screen (although I produced events
> to Kafka).
>
> I tried to skip the Splitter FlatMap function, but 

RE: timeout error while connecting to Kafka

2019-08-25 Thread Eyal Pe'er
BTW, the exception that I see in the log is: ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception 
occurred in REST handler…
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B55.21C20990]

From: Eyal Pe'er 
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat 
Cc: user@flink.apache.org
Subject: RE: timeout error while connecting to Kafka

Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used 
it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to 
connect zookeeper, instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for 
Kafka 0.8, but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B55.21C20990]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 
org.apache.flink
flink-connector-kafka_2.11
1.7.0



Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic 
metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?


org.apache.flink
flink-core
1.5.0


org.apache.flink
flink-connector-kafka-0.11_2.11
1.5.0


org.apache.flink
flink-streaming-java_2.11
1.5.0


org.apache.flink
flink-java
1.5.0


org.apache.flink
flink-clients_2.10
1.1.4


org.apache.flink
flink-connector-kafka_2.11
1.7.0



Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B55.21C20990]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and 
print it to the console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka servers:9092...");
props.setProperty("group.id", "flinkPOC");
FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>("topic", 
new SimpleStringSchema(), props);

DataStream dataStream = env.addSource(consumer);

DataStream wordDataStream = dataStream.flatMap(new Splitter());
wordDataStream.print();
env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction {

public void flatMap(String sentence, Collector out) throws 
Exception {

for (String word : sentence.split(" ")) {
out.collect(word);
}
}

}
}

The app does not print anything to the screen (although I produced events to 
Kafka).
I tried to skip the Splitter FlatMap function, but still nothing happens. SSL 
or any kind of authentication is not required from Kafka.
This is the error that I found in the logs:
2019-08-20 14:36:17,654 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Flat Map -> Sink: Print to Std. Out (1/1) 
(02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching 

RE: timeout error while connecting to Kafka

2019-08-25 Thread Eyal Pe'er
Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used 
it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to 
connect zookeeper, instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for 
Kafka 0.8, but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]

From: miki haiat 
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er 
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 
org.apache.flink
flink-connector-kafka_2.11
1.7.0



Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic 
metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?


org.apache.flink
flink-core
1.5.0


org.apache.flink
flink-connector-kafka-0.11_2.11
1.5.0


org.apache.flink
flink-streaming-java_2.11
1.5.0


org.apache.flink
flink-java
1.5.0


org.apache.flink
flink-clients_2.10
1.1.4


org.apache.flink
flink-connector-kafka_2.11
1.7.0



Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]

From: miki haiat mailto:miko5...@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er mailto:eyal.p...@startapp.com>>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and 
print it to the console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka servers:9092...");
props.setProperty("group.id", "flinkPOC");
FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>("topic", 
new SimpleStringSchema(), props);

DataStream dataStream = env.addSource(consumer);

DataStream wordDataStream = dataStream.flatMap(new Splitter());
wordDataStream.print();
env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction {

public void flatMap(String sentence, Collector out) throws 
Exception {

for (String word : sentence.split(" ")) {
out.collect(word);
}
}

}
}

The app does not print anything to the screen (although I produced events to 
Kafka).
I tried to skip the Splitter FlatMap function, but still nothing happens. SSL 
or any kind of authentication is not required from Kafka.
This is the error that I found in the logs:
2019-08-20 14:36:17,654 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Flat Map -> Sink: Print to Std. Out (1/1) 
(02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching 
topic metadata

The Kafka’s topic has only one partition, so the topic metadata supposed to be 
very basic.
I ran Kafka and the Flink locally in order to eliminate network related issues, 
but the issue persists. So my assumption is that I’m doing something wrong…
Did you encounter such issue? Does someone have different code for consuming 
Kafka events ?

Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]