flink????StateBackend????

2019-09-02 Thread ????????
Flink??


??StateBackend??env.setStateBackend(new 
RocksDBStateBackend("hdfs://host51:9000/flink/checkpoints",true))SQLwindowcheckpoint??n/a??

flink-conf.xml??
state.checkpoints.dir: 
hdfs://host51:9000/flink/flink-checkpoints??
SQLwindow??RocksDBStateBackend

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Zhu Zhu
1s looks good to me.
And I think the conclusion that when a user should override the delay is
worth to be documented.

Thanks,
Zhu Zhu

Steven Wu  于2019年9月3日周二 上午4:42写道:

> 1s sounds a good tradeoff to me.
>
> On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:
>
>> Thanks a lot for all your feedback. I see there is a slight tendency
>> towards having a non zero default delay so far.
>>
>> However, Yu has brought up some valid points. Maybe I can shed some light
>> on a).
>>
>> Before FLINK-9158 we set the default delay to 10s because Flink did not
>> support queued scheduling which meant that if one slot was missing/still
>> being occupied, then Flink would fail right away with
>> a NoResourceAvailableException. In order to prevent this we added the
>> delay. This also covered the case when the job was failing because of an
>> overloaded external system.
>>
>> When we finished FLIP-6, we thought that we could improve the user
>> experience by decreasing the default delay to 0s because all Flink related
>> problems (slot still occupied, slot missing because of reconnecting TM)
>> could be handled by the default slot request time out which allowed the
>> slots to become ready after the scheduling was kicked off. However, we did
>> not properly take the case of overloaded external systems into account.
>>
>> For b) I agree that any default value should be properly documented. This
>> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
>> believe that there won't be the solve it all default value. There are
>> always cases where one needs to adapt it to ones needs. But this is ok. The
>> goal should be to find the default value which works for most cases.
>>
>> So maybe the middle ground between 10s and 0s could be a solution.
>> Setting the default restart delay to 1s should prevent restart storms
>> caused by overloaded external systems and still be fast enough to not slow
>> down recoveries noticeably in most cases. If one needs a super fast
>> recovery, then one should set the delay value to 0s. If one requires a
>> longer delay because of a particular infrastructure, then one needs to
>> change the value too. What do you think?
>>
>> Cheers,
>> Till
>>
>> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>>
>>> -1 on increasing the default delay to none zero, with below reasons:
>>>
>>> a) I could see some concerns about setting the delay to zero in the very
>>> original JIRA (FLINK-2993
>>> ) but later on in
>>> FLINK-9158  we still
>>> decided to make the change, so I'm wondering whether the decision also came
>>> from any customer requirement? If so, how could we judge whether one
>>> requirement override the other?
>>>
>>> b) There could be valid reasons for both default values depending on
>>> different use cases, as well as relative work around (like based on latest
>>> policy, setting the config manually to 10s could resolve the problem
>>> mentioned), and from former replies to this thread we could see users have
>>> already taken actions. Changing it back to non-zero again won't affect such
>>> users but might cause surprises to those depending on 0 as default.
>>>
>>> Last but not least, no matter what decision we make this time, I'd
>>> suggest to make it final and document in our release note explicitly.
>>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>>> the change on default restart delay and we'd better learn from it this
>>> time. Thanks.
>>>
>>> [1]
>>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>>
 +1 on what Zhu Zhu said.

 We also override the default to 10 s.

 On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:

> In our production, we usually override the restart delay to be 10 s.
> We once encountered cases that external services are overwhelmed by
> reconnections from frequent restarted tasks.
> As a safer though not optimized option, a default delay larger than 0
> s is better in my opinion.
>
>
> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>
>> Hi,
>>
>>
>> I thinks it's better to increase the default value. +1
>>
>>
>> Best.
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "Till Rohrmann";
>> 发送时间: 2019年8月30日(星期五) 晚上10:07
>> 收件人: "dev"; "user";
>> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>>
>>
>>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask whether decreasing the default
>> delay
>> to `0 s` for the fixed delay restart strategy [1] is causing trouble.
>> A
>> user reported that he would like to 

Flink SQL 时间问题

2019-09-02 Thread hb
使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

understanding task manager logs

2019-09-02 Thread Vishwas Siravara
Hi guys,
I am using flink 1.7.2 and my application consumes from a kafka topic and
publish to another kafka topic which is in its own kafka environment
running a different kafka version,. I am using FlinkKafkaConsumer010 from
this dependency
*"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. *

In the task manager log I see these lines:

2019-09-02 02:57:59,840 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin  -
Successfully logged in.
2019-09-02 02:57:59,841 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
thread started.
2019-09-02 02:57:59,842 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid
starting at: Mon Sep 02 02:57:59 GMT 2019
2019-09-02 02:57:59,843 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT
expires: Mon Sep 02 12:57:59 GMT 2019
2019-09-02 02:57:59,843 INFO
org.apache.kafka.common.security.kerberos.KerberosLogin   -
[Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh
sleeping until: Mon Sep 02 11:14:13 GMT 2019
2019-09-02 02:57:59,919 WARN
org.apache.kafka.clients.consumer.ConsumerConfig  - The
configuration 'zookeeper.connect' was supplied but isn't a known
config.*2019-09-02 02:57:59,919 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
version : 0.10.2.0
*2019-09-02 02:57:59,919 INFO
org.apache.kafka.common.utils.AppInfoParser   - Kafka
commitId : 576d93a8dc0cf421

Here if you see the Kafka version is 0.10.2.0. Is this the version the
broker is running or is this coming from flink ? I have forced the
kafka-client version

to be 2.2.0

"org.apache.kafka" % "kafka-clients" % "2.2.0" force()

I also don't see 0.10.2.0 in the dependency tree of my build.

Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ?
What should I do if the consumer broker and producer broker are on
different versions of kafka ?


Thanks,

Vishwas


Thanks,

Vishwas


question

2019-09-02 Thread ????????
How do you do:
My problem is flink table format and table schema mapping.
The input data is similar to the following json format??
{   "id": "123","serial": "6b0c2d26",   "msg": {"f1": 
"5677"} } The format code for TableSource is as follows: new 
Json().schema(Types.ROW(new String[] {   "id",   "serial",   "msg" }, new 
TypeInformation << ? > [] {Types.STRING(), Types.STRING(), 
Types.ROW(new String[] {"f1"}, new TypeInformation << ? > 
[] {  Types.STRING()  }) }));


The schema part of TableSource is as follows:
Schema schema = new Schema(); schema.field("id", Types.STRING()); 
schema.field("serial", Types.STRING());


I don't know how to define the f1 field of msg in the schema. I tried 
schema.field("f1", Types.STRING()) before; but I will report an error. What is 
the correct method?
The following SQL can be run correctly:
select id,serial,f1 from table;  


My flink version is 1.8.1,use  flink table & SQL API


thanks;

Re: kinesis table connector support

2019-09-02 Thread Bowen Li
@Fanbin, I don't think there's one yet. Feel free to create a ticket and
submit a PR for it

On Mon, Sep 2, 2019 at 8:13 AM Biao Liu  wrote:

> Hi Fanbin,
>
> I'm not familiar with table module. Maybe someone else could help.
>
> @jincheng sun 
> Do you know there is any plan for kinesis table connector?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Sat, 24 Aug 2019 at 02:26, Fanbin Bu  wrote:
>
>> Hi,
>>
>> Looks like Flink table connectors do not include `kinesis`. (only
>> FileSystem, Kafka, ES) see
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
>> .
>> I also found some examples for Kafka:
>> https://eventador.io/blog/flink_table_and_sql_api_with_apache_flink_16/.
>> I'm wondering is there such a thing for kinesis also.
>>
>> Is there any plan to support this in the future? Otherwise, what needs to
>> be done if we want to implement it on my own.
>>
>> Basically, I have a kinesis stream that emits json string data and I
>> would like to use Flink Table/SQL api to to the streaming/batch processing.
>> Currently, I'm using DataStream API which is not as flexible.
>>
>> Any help would be appreciated.
>>
>> Thanks,
>> Fanbin
>>
>


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Steven Wu
1s sounds a good tradeoff to me.

On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:

> Thanks a lot for all your feedback. I see there is a slight tendency
> towards having a non zero default delay so far.
>
> However, Yu has brought up some valid points. Maybe I can shed some light
> on a).
>
> Before FLINK-9158 we set the default delay to 10s because Flink did not
> support queued scheduling which meant that if one slot was missing/still
> being occupied, then Flink would fail right away with
> a NoResourceAvailableException. In order to prevent this we added the
> delay. This also covered the case when the job was failing because of an
> overloaded external system.
>
> When we finished FLIP-6, we thought that we could improve the user
> experience by decreasing the default delay to 0s because all Flink related
> problems (slot still occupied, slot missing because of reconnecting TM)
> could be handled by the default slot request time out which allowed the
> slots to become ready after the scheduling was kicked off. However, we did
> not properly take the case of overloaded external systems into account.
>
> For b) I agree that any default value should be properly documented. This
> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
> believe that there won't be the solve it all default value. There are
> always cases where one needs to adapt it to ones needs. But this is ok. The
> goal should be to find the default value which works for most cases.
>
> So maybe the middle ground between 10s and 0s could be a solution. Setting
> the default restart delay to 1s should prevent restart storms caused by
> overloaded external systems and still be fast enough to not slow down
> recoveries noticeably in most cases. If one needs a super fast recovery,
> then one should set the delay value to 0s. If one requires a longer delay
> because of a particular infrastructure, then one needs to change the value
> too. What do you think?
>
> Cheers,
> Till
>
> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>
>> -1 on increasing the default delay to none zero, with below reasons:
>>
>> a) I could see some concerns about setting the delay to zero in the very
>> original JIRA (FLINK-2993
>> ) but later on in
>> FLINK-9158  we still
>> decided to make the change, so I'm wondering whether the decision also came
>> from any customer requirement? If so, how could we judge whether one
>> requirement override the other?
>>
>> b) There could be valid reasons for both default values depending on
>> different use cases, as well as relative work around (like based on latest
>> policy, setting the config manually to 10s could resolve the problem
>> mentioned), and from former replies to this thread we could see users have
>> already taken actions. Changing it back to non-zero again won't affect such
>> users but might cause surprises to those depending on 0 as default.
>>
>> Last but not least, no matter what decision we make this time, I'd
>> suggest to make it final and document in our release note explicitly.
>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>> the change on default restart delay and we'd better learn from it this
>> time. Thanks.
>>
>> [1]
>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>
>> Best Regards,
>> Yu
>>
>>
>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>
>>> +1 on what Zhu Zhu said.
>>>
>>> We also override the default to 10 s.
>>>
>>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>>
 In our production, we usually override the restart delay to be 10 s.
 We once encountered cases that external services are overwhelmed by
 reconnections from frequent restarted tasks.
 As a safer though not optimized option, a default delay larger than 0 s
 is better in my opinion.


 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:

> Hi,
>
>
> I thinks it's better to increase the default value. +1
>
>
> Best.
>
>
>
>
> -- 原始邮件 --
> 发件人: "Till Rohrmann";
> 发送时间: 2019年8月30日(星期五) 晚上10:07
> 收件人: "dev"; "user";
> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>
>
>
> Hi everyone,
>
> I wanted to reach out to you and ask whether decreasing the default
> delay
> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
> user reported that he would like to increase the default value because
> it
> can cause restart storms in case of systematic faults [2].
>
> The downside of increasing the default delay would be a slightly
> increased
> restart time if this config option is not explicitly set.
>
> [1] 

Re: End of Window Marker

2019-09-02 Thread Eduardo Winpenny Tejedor
Hi all,

I'll illustrate my approach with an example as it is definitely
unorthodox. Here's some sample code. It works for me...I hope there
are no (obvious) flaws!

//myStream should be a stream of objects associated to a timestamp.
the idea is to create a Flink app that
//sends each object to kafka with the ability of also sending an extra
end-of-stream message after all events
//for the same associated timestamp have been sent
final SingleOutputStreamOperator> myStream =
null; //replace with actual stream
final String KAFKA_SINK_TOPIC = "OUTPUT_TOPIC";
final SinkFunction> singleObjectKafkaSink = null;
//replace with...
//needs to take care of mapping a timestamp into an end-of-stream marker record
final KeyedSerializationSchema serializationSchema = null; //replace with...
final Properties producerProperties = null; //replace with...
final int kafkaProducerPoolSize = 6; //for example...

//sinks every event of a window
myStream.addSink(singleObjectKafkaSink);

//sends one end-of-stream message per kafka partition
myStream
.map(item -> item._1()) //keep only the time
.keyBy(item -> 1) //forces every event to go to the same task
.process(new KeyedProcessFunction>() {

private KafkaProducer kafkaProducer;

@Override
public void open(Configuration parameters) throws Exception {
kafkaProducer = new KafkaProducer<>(producerProperties);
}

@Override
public void close() throws Exception {
if (kafkaProducer != null) {
kafkaProducer.close();
}
}

@Override
public void processElement(Long timestamp, Context ctx,
Collector> out) throws Exception {
//timer coalescing avoids firing more than once per timestamp
ctx.timerService().registerEventTimeTimer(timestamp);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector> out) throws Exception {
//Flink guarantees operator will only fire after
normal sink has finished producing messages
//send one event per partition downstream

kafkaProducer.partitionsFor("OUTPUT_TOPIC").forEach(partitionInfo ->
out.collect(new Tuple2<>(timestamp, partitionInfo.partition(;
}
}).addSink(
new FlinkKafkaProducer>(KAFKA_SINK_TOPIC, serializationSchema, producerProperties,
Optional.of(new FlinkKafkaPartitioner>() {
@Override
public int partition(Tuple2 record, byte[]
key, byte[] value, String targetTopic, int[] partitions) {
return record._2(); //send to the partition it was
designed to be sent
}
}), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducerPoolSize));


On Mon, Sep 2, 2019 at 5:07 PM Padarn Wilson  wrote:
>
> Hi Fabian,
>
> > but each partition may only be written by a single task
>
> Sorry I think I misunderstand something here then: If I have a topic with one 
> partition, but multiple sink tasks (or parallelism > 1).. this means the data 
> must all be shuffled to the single task writing that partition?
>
> Padarn
>
> On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske  wrote:
>>
>> Hi Padarn,
>>
>> Regarding your throughput concerns: A sink task may write to multiple 
>> partitions, but each partition may only be written by a single task.
>>
>> @Eduardo: Thanks for sharing your approach! Not sure if I understood it 
>> correctly, but I think that the approach does not guarantee that all results 
>> of a window are emitted before the end-of-window marker is written.
>> Since the sink operator and the single-task-operator are separate operators, 
>> the output records might get stuck (or be bufffered) in one of the sink 
>> tasks and the single-task would still emit an end-of-window marker record 
>> because it doesn't know about the sink task.
>>
>> Best,
>> Fabian
>>
>> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor 
>> :
>>>
>>> Hi,
>>>
>>> I'll chip in with an approach I'm trying at the moment that seems to work, 
>>> and I say seems because I'm only running this on a personal project.
>>>
>>> Personally, I don't have anything against end-of-message markers per 
>>> partition, Padarn you seem to not prefer this option as it overloads the 
>>> meaning of the output payload. My approach is equally valid when producing 
>>> watermarks/end-of-message markers on a side output though.
>>>
>>> The main problem of both approaches is knowing when the window has finished 
>>> across all partitions without having to wait for the start of the next 
>>> window.
>>>
>>> I've taken the approach of sending all output messages of the window to 1. 
>>> the sink but also 2. a single task operator. The single task operator 
>>> registers an event time based timer at the time of the end of the window. 
>>> You have the confidence of the task's timer triggering only 

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Till Rohrmann
Thanks a lot for all your feedback. I see there is a slight tendency
towards having a non zero default delay so far.

However, Yu has brought up some valid points. Maybe I can shed some light
on a).

Before FLINK-9158 we set the default delay to 10s because Flink did not
support queued scheduling which meant that if one slot was missing/still
being occupied, then Flink would fail right away with
a NoResourceAvailableException. In order to prevent this we added the
delay. This also covered the case when the job was failing because of an
overloaded external system.

When we finished FLIP-6, we thought that we could improve the user
experience by decreasing the default delay to 0s because all Flink related
problems (slot still occupied, slot missing because of reconnecting TM)
could be handled by the default slot request time out which allowed the
slots to become ready after the scheduling was kicked off. However, we did
not properly take the case of overloaded external systems into account.

For b) I agree that any default value should be properly documented. This
was clearly an oversight when FLINK-9158 has been merged. Moreover, I
believe that there won't be the solve it all default value. There are
always cases where one needs to adapt it to ones needs. But this is ok. The
goal should be to find the default value which works for most cases.

So maybe the middle ground between 10s and 0s could be a solution. Setting
the default restart delay to 1s should prevent restart storms caused by
overloaded external systems and still be fast enough to not slow down
recoveries noticeably in most cases. If one needs a super fast recovery,
then one should set the delay value to 0s. If one requires a longer delay
because of a particular infrastructure, then one needs to change the value
too. What do you think?

Cheers,
Till

On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:

> -1 on increasing the default delay to none zero, with below reasons:
>
> a) I could see some concerns about setting the delay to zero in the very
> original JIRA (FLINK-2993
> ) but later on in
> FLINK-9158  we still
> decided to make the change, so I'm wondering whether the decision also came
> from any customer requirement? If so, how could we judge whether one
> requirement override the other?
>
> b) There could be valid reasons for both default values depending on
> different use cases, as well as relative work around (like based on latest
> policy, setting the config manually to 10s could resolve the problem
> mentioned), and from former replies to this thread we could see users have
> already taken actions. Changing it back to non-zero again won't affect such
> users but might cause surprises to those depending on 0 as default.
>
> Last but not least, no matter what decision we make this time, I'd suggest
> to make it final and document in our release note explicitly. Checking the
> 1.5.0 release note [1] [2] it seems we didn't mention about the change on
> default restart delay and we'd better learn from it this time. Thanks.
>
> [1]
> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>
> Best Regards,
> Yu
>
>
> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>
>> +1 on what Zhu Zhu said.
>>
>> We also override the default to 10 s.
>>
>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>
>>> In our production, we usually override the restart delay to be 10 s.
>>> We once encountered cases that external services are overwhelmed by
>>> reconnections from frequent restarted tasks.
>>> As a safer though not optimized option, a default delay larger than 0 s
>>> is better in my opinion.
>>>
>>>
>>> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>>>
 Hi,


 I thinks it's better to increase the default value. +1


 Best.




 -- 原始邮件 --
 发件人: "Till Rohrmann";
 发送时间: 2019年8月30日(星期五) 晚上10:07
 收件人: "dev"; "user";
 主题: [SURVEY] Is the default restart delay of 0s causing problems?



 Hi everyone,

 I wanted to reach out to you and ask whether decreasing the default
 delay
 to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
 user reported that he would like to increase the default value because
 it
 can cause restart storms in case of systematic faults [2].

 The downside of increasing the default delay would be a slightly
 increased
 restart time if this config option is not explicitly set.

 [1] https://issues.apache.org/jira/browse/FLINK-9158
 [2] https://issues.apache.org/jira/browse/FLINK-11218

 Cheers,
 Till
>>>
>>>


Re: End of Window Marker

2019-09-02 Thread Padarn Wilson
Hi Fabian,

> but each partition may only be written by a single task

Sorry I think I misunderstand something here then: If I have a topic with
one partition, but multiple sink tasks (or parallelism > 1).. this means
the data must all be shuffled to the single task writing that partition?

Padarn

On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske  wrote:

> Hi Padarn,
>
> Regarding your throughput concerns: A sink task may write to multiple
> partitions, but each partition may only be written by a single task.
>
> @Eduardo: Thanks for sharing your approach! Not sure if I understood it
> correctly, but I think that the approach does not guarantee that all
> results of a window are emitted before the end-of-window marker is written.
> Since the sink operator and the single-task-operator are separate
> operators, the output records might get stuck (or be bufffered) in one of
> the sink tasks and the single-task would still emit an end-of-window marker
> record because it doesn't know about the sink task.
>
> Best,
> Fabian
>
> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com>:
>
>> Hi,
>>
>> I'll chip in with an approach I'm trying at the moment that seems to
>> work, and I say seems because I'm only running this on a personal project.
>>
>> Personally, I don't have anything against end-of-message markers per
>> partition, Padarn you seem to not prefer this option as it overloads the
>> meaning of the output payload. My approach is equally valid when producing
>> watermarks/end-of-message markers on a side output though.
>>
>> The main problem of both approaches is knowing when the window has
>> finished across all partitions without having to wait for the start of the
>> next window.
>>
>> I've taken the approach of sending all output messages of the window to
>> 1. the sink but also 2. a single task operator. The single task operator
>> registers an event time based timer at the time of the end of the window.
>> You have the confidence of the task's timer triggering only once at the
>> right time because all the post-window watermarks go through to the same
>> task. At that point I make the task send an end-of-message marker to every
>> partition. I don't need to send the count because Kafka messages are
>> ordered. AND IF you prefer to not overload the semantic of your original
>> Kafka topic you can post the message to a separate location of your choice.
>>
>> While this does mean that the end of marker message only gets sent
>> through once the window has finished across all substreams (as opposed to
>> per stream), it does mean you don't need to wait for the next window to
>> start AND the watermark gap between substreams should never grow that much
>> anyway.
>>
>> This approach should be particularly useful when the number of partitions
>> or keying mechanism is different between the input and output topics.
>>
>> Hopefully that doesn't sound like a terrible idea.
>>
>> eduardo
>>
>>
>>
>>
>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson,  wrote:
>>
>>> Hi again Fabian,
>>>
>>> Thanks for pointing this out to me. In my case there is no need for
>>> keyed writing - but I do wonder if having each kafka task write only to a
>>> single partition would significantly affect performance.
>>>
>>> Actually now that I think about it, the approach to just wait for the
>>> first records of the next window is also subject to the problem you mention
>>> above: a producer lagging behind the rest could end up with a partition
>>> containing element out of ‘window order’.
>>>
>>> I was also thinking this problem is very similar to that of checkpoint
>>> barriers. I intended to dig into the details of the exactly once Kafka sink
>>> for some inspiration.
>>>
>>> Padarn
>>>
>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Padarn,

 Yes, this is quite tricky.
 The "problem" with watermarks is that you need to consider how you
 write to Kafka.
 If your Kafka sink writes to keyed Kafka stream (each Kafka partition
 is written by multiple producers), you need to broadcast the watermarks to
 each partition, i.e., each partition would receive watermarks from each
 parallel sink task. So in order to reason about the current watermark of a
 partition, you need to observe them and take the minimum WM across all
 current sink task WMs.
 Things become much easier, if each partition is only written by a
 single task but this also means that data is not key-partitioned in Kafka.
 In that case, the sink task only needs to write a WM message to each of
 its assigned partitions.

 Hope this helps,
 Fabian


 Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
 pad...@gmail.com>:

> Hi Fabian, thanks for your input
>
> Exactly. Actually my first instinct was to see if it was possible to
> publish the watermarks somehow - my initial idea was to insert regular

Re: kinesis table connector support

2019-09-02 Thread Biao Liu
Hi Fanbin,

I'm not familiar with table module. Maybe someone else could help.

@jincheng sun 
Do you know there is any plan for kinesis table connector?

Thanks,
Biao /'bɪ.aʊ/



On Sat, 24 Aug 2019 at 02:26, Fanbin Bu  wrote:

> Hi,
>
> Looks like Flink table connectors do not include `kinesis`. (only
> FileSystem, Kafka, ES) see
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#table-connectors
> .
> I also found some examples for Kafka:
> https://eventador.io/blog/flink_table_and_sql_api_with_apache_flink_16/.
> I'm wondering is there such a thing for kinesis also.
>
> Is there any plan to support this in the future? Otherwise, what needs to
> be done if we want to implement it on my own.
>
> Basically, I have a kinesis stream that emits json string data and I would
> like to use Flink Table/SQL api to to the streaming/batch processing.
> Currently, I'm using DataStream API which is not as flexible.
>
> Any help would be appreciated.
>
> Thanks,
> Fanbin
>


Re: checkpoint failure suddenly even state size is into 10 mb around

2019-09-02 Thread Biao Liu
Hi Sushant,

Your screenshot shows the checkpoint expired. It means checkpoint did not
finish in time.
I guess the reason is the heavy back pressure blocks both data and barrier.
But I can't tell why there was a heavy back pressure.

If this scenario happens again, you could pay more attention to the tasks
which cause this heavy back pressure.
The task manager log, GC log, and some other tools like jstack might help.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 23 Aug 2019 at 15:27, Sushant Sawant 
wrote:

> Hi all,
> m facing two issues which I believe are co-related though.
> 1. Kafka source shows high back pressure.
> 2. Sudden checkpoint failure for entire day until restart.
>
> My job does following thing,
> a. Read from Kafka
> b. Asyncio to external system
> c. Dumping in Cassandra, Elasticsearch
>
> Checkpointing is using file system.
> This flink job is proven under high load,
> around 5000/sec throughput.
> But recently we scaled down parallelism since, there wasn't any load in
> production and these issues started.
>
> Please find the status shown by flink dashboard.
> The github folder contains image where there was high back pressure and
> checkpoint failure
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
> and  after restart, "everything is fine" images in this folder,
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing
>
> --
> Could anyone point me towards direction what would have went wrong/
> trouble shooting??
>
>
> Thanks & Regards,
> Sushant Sawant
>


Kinesis stream and serialization schemas

2019-09-02 Thread Yoandy Rodríguez
Hi everyone,

As I've mention in previous emails, we're currently exploring flink as a
substitute for some in house products. One of these products sends JSON

data to a Kinesis Data Stream, another product process the records after
some time.

We've tried to set up the Kinesis producer like this:

FlinkKinesisProducer
kinesis=newFlinkKinesisProducer<>(newSimpleStringSchema(), producerConfig);

But on the other application we kept getting some weird binary data, so
right now we're using the following

FlinkKinesisProducer
kinesis=newFlinkKinesisProducer<>(newKinesisSerializationSchema() {
privatestaticfinallongserialVersionUID=-3435842401751414891L;
@Override
publicByteBufferserialize(Stringelement) {
returnByteBuffer.wrap(element.getBytes());
}
@Override
publicStringgetTargetStream(Stringelement) {
returnnull;
}
}, producerConfig);

But that's not working either. What are we doing wrong?.

Thanks in advance

-- 
Best Regards
Yoandy Rodríguez



如何使用forward方式向kafka中生产数据

2019-09-02 Thread gaofeilong198...@163.com
我的kafka有10个分区,现在我希望使用flink程序的forward的方式而不是rebalance的方式向kafka中生产数据,那么是应该用以下哪种方式呢?
ds.map(line => someFunction).setParallelism(10).addSink(myKafkaProducer)
or
ds.map(line => someFunction).addSink(myKafkaProducer).setParallelism(10)



--
高飞龙

gaofeilong198...@163.com


Re: Is it possible to register a custom TypeInfoFactory without using an annotation?

2019-09-02 Thread Biao Liu
Hi,

Java supports customization of serializer/deserializer, see [1]. Could it
satisfy your requirement?

1. https://stackoverflow.com/questions/7290777/java-custom-serialization

Thanks,
Biao /'bɪ.aʊ/



On Mon, 26 Aug 2019 at 16:34, 杨力  wrote:

> I'd like to provide a custom serializer for a POJO class. But that class
> cannot be modified so it's not possible to add a @TypeInfo annotation to
> it. Are there any other ways to register one?
>


Window metadata removal

2019-09-02 Thread gil bl
Hi, I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.What is the purpose of keeping this data if no new events are expected to enter the pane? Is there any way this metadata can be released earlier?

Re: tumbling event time window , parallel

2019-09-02 Thread Fabian Hueske
I meant to not use Flink's built-in windows at all but implement your logic
in a KeyedProcessFunction.

So basically:
myDataStream.keyBy(...).process(new MyKeyedProcessFunction)
instead of:
myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction)

Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Im not sure what you mean by use process function and not window process
> function ,  as the window operator takes in a windowprocess function..
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 1:33 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> I would use a regular ProcessFunction, not a WindowProcessFunction.
>
>
>
> The final WM depends on how the records were partitioned at the watermark
> assigner (and the assigner itself).
>
> AFAIK, the distribution of files to source reader tasks is not
> deterministic. Hence, the final WM changes from run to run.
>
>
>
> Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> You said “ You can use a custom ProcessFunction and compare the timestamp
> of each record with the current watermark.”.
>
>
>
> Does the  window  process function has all the events – even the ones that
> are dropped due to lateness?
> from what I’m understand the “ iterable”  argument I contains the record
> that were inserted into the window  and NOT the ones dropped.   Isn’t that
> correct ?
>
>
>
>
>
> Also,
>
> when looking on Flink’s monitoring page  - for  the  watermarks  I see
> different vales  even after all my files were processed.  Which is
> something I would not expect
> I would expect that eventually   the WM will be the highest EVENT_TIME on
> my set of files..
>
>
>
>
>
> thanks
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 12:38 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> The paths of the files to read are distributed across all reader / source
> tasks and each task reads the files in order of their modification
> timestamp.
>
> The watermark generator is not aware of any files and just looks at the
> stream of records produced by the source tasks.
>
> You need to chose the WM generator strategy such that you minimize the
> number of late records.
>
>
>
> I'd recommend to first investigate how many late records you are dealing
> with.
>
> You can use a custom ProcessFunction and compare the timestamp of each
> record with the current watermark.
>
>
>
> AllowedLateness is also not a magical cure. It will just emit updates
> downstream, i.e., you need to remove the results that were updated by a
> more complete result.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the results, but
> there are a few things that can affect that:
>
> 1) non-determnistic keys
>
> 2) out-of-order data with inappropriate watermarks
>
>
>
> Note that watermark configuration for file ingests can be difficult and
> that you need to ensure that files are read in the "right" order.
>
> AFAIK, Flink's continuous file source uses the modification timestamp of
> files to determine the read order.
>
>
>
> Best, Fabian
>
>
>
> Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have an issue 

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-02 Thread Becket Qin
Hi Tony,

>From the symptom it is not quite clear to me what may cause this issue.
Supposedly the TransactionCoordinator is independent of the active
controller, so bouncing the active controller should not have special
impact on the transactions (at least not every time). If this is stably
reproducible, is it possible to turn on debug level logging
on kafka.coordinator.transaction.TransactionCoordinator to see what does
the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei  wrote:

> Hi,
>
> Has anyone run into the same problem? I have updated my producer
> transaction timeout to 1.5 hours,
> but the problem sill happened when I restarted broker with active
> controller. It might not due to the
> problem that checkpoint duration is too long causing transaction timeout.
> I had no more clue to find out
> what's wrong about my kafka producer. Could someone help me please?
>
> Best,
> Tony Wei
>
> Fabian Hueske  於 2019年8月16日 週五 下午4:10寫道:
>
>> Hi Tony,
>>
>> I'm sorry I cannot help you with this issue, but Becket (in CC) might
>> have an idea what went wrong here.
>>
>> Best, Fabian
>>
>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>> tony19920...@gmail.com>:
>>
>>> Hi,
>>>
>>> Currently, I was trying to update our kafka cluster with larger `
>>> transaction.max.timeout.ms`. The
>>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>>> to set as 3 hours.
>>>
>>> When I was doing rolling-restart for my brokers, this exception came to
>>> me on the next checkpoint
>>> after I restarted the broker with active controller.
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
 java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748) Caused by:
 org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
 failed, logging first encountered failure at
 org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
 at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
 more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
 The producer attempted a transactional operation in an invalid state
>>>
>>>
>>> I have no idea why it happened, and I didn't find any error log from
>>> brokers. Does anyone have
>>> this exception before? How can I prevent from this exception when I
>>> tried to restart kafka cluster?
>>> Does this exception mean that I will lost data in some of these
>>> transactions?
>>>
>>> flink cluster version: 1.8.1
>>> kafka cluster version: 1.0.1
>>> flink kafka producer version: universal
>>> producer transaction timeout: 15 minutes
>>> checkpoint interval: 5 minutes
>>> number of concurrent checkpoint: 1
>>> max checkpoint duration before and after the exception occurred:  < 2
>>> minutes
>>>
>>> Best,
>>> Tony Wei
>>>
>>


Re: End of Window Marker

2019-09-02 Thread Fabian Hueske
Hi Padarn,

Regarding your throughput concerns: A sink task may write to multiple
partitions, but each partition may only be written by a single task.

@Eduardo: Thanks for sharing your approach! Not sure if I understood it
correctly, but I think that the approach does not guarantee that all
results of a window are emitted before the end-of-window marker is written.
Since the sink operator and the single-task-operator are separate
operators, the output records might get stuck (or be bufffered) in one of
the sink tasks and the single-task would still emit an end-of-window marker
record because it doesn't know about the sink task.

Best,
Fabian

Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com>:

> Hi,
>
> I'll chip in with an approach I'm trying at the moment that seems to work,
> and I say seems because I'm only running this on a personal project.
>
> Personally, I don't have anything against end-of-message markers per
> partition, Padarn you seem to not prefer this option as it overloads the
> meaning of the output payload. My approach is equally valid when producing
> watermarks/end-of-message markers on a side output though.
>
> The main problem of both approaches is knowing when the window has
> finished across all partitions without having to wait for the start of the
> next window.
>
> I've taken the approach of sending all output messages of the window to 1.
> the sink but also 2. a single task operator. The single task operator
> registers an event time based timer at the time of the end of the window.
> You have the confidence of the task's timer triggering only once at the
> right time because all the post-window watermarks go through to the same
> task. At that point I make the task send an end-of-message marker to every
> partition. I don't need to send the count because Kafka messages are
> ordered. AND IF you prefer to not overload the semantic of your original
> Kafka topic you can post the message to a separate location of your choice.
>
> While this does mean that the end of marker message only gets sent through
> once the window has finished across all substreams (as opposed to per
> stream), it does mean you don't need to wait for the next window to start
> AND the watermark gap between substreams should never grow that much anyway.
>
> This approach should be particularly useful when the number of partitions
> or keying mechanism is different between the input and output topics.
>
> Hopefully that doesn't sound like a terrible idea.
>
> eduardo
>
>
>
>
> On Wed, 28 Aug 2019, 02:54 Padarn Wilson,  wrote:
>
>> Hi again Fabian,
>>
>> Thanks for pointing this out to me. In my case there is no need for keyed
>> writing - but I do wonder if having each kafka task write only to a single
>> partition would significantly affect performance.
>>
>> Actually now that I think about it, the approach to just wait for the
>> first records of the next window is also subject to the problem you mention
>> above: a producer lagging behind the rest could end up with a partition
>> containing element out of ‘window order’.
>>
>> I was also thinking this problem is very similar to that of checkpoint
>> barriers. I intended to dig into the details of the exactly once Kafka sink
>> for some inspiration.
>>
>> Padarn
>>
>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske  wrote:
>>
>>> Hi Padarn,
>>>
>>> Yes, this is quite tricky.
>>> The "problem" with watermarks is that you need to consider how you write
>>> to Kafka.
>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
>>> written by multiple producers), you need to broadcast the watermarks to
>>> each partition, i.e., each partition would receive watermarks from each
>>> parallel sink task. So in order to reason about the current watermark of a
>>> partition, you need to observe them and take the minimum WM across all
>>> current sink task WMs.
>>> Things become much easier, if each partition is only written by a single
>>> task but this also means that data is not key-partitioned in Kafka.
>>> In that case, the sink task only needs to write a WM message to each of
>>> its assigned partitions.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>>
>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
>>> pad...@gmail.com>:
>>>
 Hi Fabian, thanks for your input

 Exactly. Actually my first instinct was to see if it was possible to
 publish the watermarks somehow - my initial idea was to insert regular
 watermark messages into each partition of the stream, but exposing this
 seemed quite troublesome.

 > In that case, you could have a ProcessFunction that is chained before
 the sink and which counts the window results per time slice and emits the
 result when the watermark passes to a side output.
 All side output messages are collected by a single task and can be
 published to a Kafka topic or even be made available via Queryable State.


Join with slow changing dimensions/ streams

2019-09-02 Thread Hanan Yehudai
I have a very common use case -enriching the stream with  some dimension 
tables.

e.g   the events stream has a SERVER_ID ,  and another files have the LOCATION  
associated with e SERVER_ID. ( a dimension table  csv file)

in SQL I would  simply join.
but hen using Flink  stream API ,  as far as I see,  there are several option 
and I wondered which would be optimal.


1. Use the JOIN operator,,  from the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html)
this is always has some time aspect  to the join .  unless I use an interval 
join with very large upper bound and associate the dimension stream record with 
 an old timestamp.

2. just write a mapper function the gets the NAME from the dimesion records – 
that are preloaded on the mapFunction  loading method.

3. use a broadcast state – this way I can also listen to the changes on the 
dimension  tables  and do the actual join in the processElement ducntion.

What soul be the most efficient way to do this from mem and Cpu consumption 
perspective ?

Or is there another , better way ?


Re: checkpoint failure in forever loop suddenly even state size less than 1 mb

2019-09-02 Thread Fabian Hueske
Hi Sushant,

It's hard to tell what's going on.
Maybe the thread pool of the async io operator is too small for the
ingested data rate?
This could cause the backpressure on the source and eventually also the
failing checkpoints.

Which Flink version are you using?

Best, Fabian


Am Do., 29. Aug. 2019 um 12:07 Uhr schrieb Sushant Sawant <
sushantsawant7...@gmail.com>:

> Hi Fabian,
> Sorry for one to one mail.
> Could you help me out with this m stuck with this issue over a week now.
>
> Thanks & Regards,
> Sushant Sawant
>
>
>
> On Tue, 27 Aug 2019, 15:23 Sushant Sawant, 
> wrote:
>
>> Hi, firstly thanks for replying.
>>
>> Here it is.. configuration related to checkpoint.
>>
>> CheckpointingMode checkpointMode =
>> CheckpointingMode.valueOf(‘AT_LEAST_ONCE’);
>>
>> Long checkpointInterval =
>> Long.valueOf(parameterMap.get(Checkpoint.CHECKPOINT_INTERVAL.getKey()));
>>
>> StateBackend sb=new FsStateBackend(file:);
>>
>> env.setStateBackend(sb);
>>
>> env.enableCheckpointing(30, checkpointMode);
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>>
>> env.getCheckpointConfig().setCheckpointTimeout(18);
>>
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> Thanks & Regards,
>> Sushant Sawant
>>
>> On Tue, 27 Aug 2019, 14:09 pengcheng...@bonc.com.cn, <
>> pengcheng...@bonc.com.cn> wrote:
>>
>>> Hi,What's your checkpoint config?
>>>
>>> --
>>> pengcheng...@bonc.com.cn
>>>
>>>
>>> *From:* Sushant Sawant 
>>> *Date:* 2019-08-27 15:31
>>> *To:* user 
>>> *Subject:* Re: checkpoint failure suddenly even state size less than 1
>>> mb
>>> Hi team,
>>> Anyone for help/suggestion, now we have stopped all input in kafka,
>>> there is no processing, no sink but checkpointing is failing.
>>> Is it like once checkpoint fails it keeps failing forever until job
>>> restart.
>>>
>>> Help appreciated.
>>>
>>> Thanks & Regards,
>>> Sushant Sawant
>>>
>>> On 23 Aug 2019 12:56 p.m., "Sushant Sawant" 
>>> wrote:
>>>
>>> Hi all,
>>> m facing two issues which I believe are co-related though.
>>> 1. Kafka source shows high back pressure.
>>> 2. Sudden checkpoint failure for entire day until restart.
>>>
>>> My job does following thing,
>>> a. Read from Kafka
>>> b. Asyncio to external system
>>> c. Dumping in Cassandra, Elasticsearch
>>>
>>> Checkpointing is using file system.
>>> This flink job is proven under high load,
>>> around 5000/sec throughput.
>>> But recently we scaled down parallelism since, there wasn't any load in
>>> production and these issues started.
>>>
>>> Please find the status shown by flink dashboard.
>>> The github folder contains image where there was high back pressure and
>>> checkpoint failure
>>>
>>> https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
>>> and  after restart, "everything is fine" images in this folder,
>>>
>>> https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing
>>>
>>> --
>>> Could anyone point me towards direction what would have went wrong/
>>> trouble shooting??
>>>
>>>
>>> Thanks & Regards,
>>> Sushant Sawant
>>>
>>>
>>>
>>>


RE: tumbling event time window , parallel

2019-09-02 Thread Hanan Yehudai
Im not sure what you mean by use process function and not window process 
function ,  as the window operator takes in a windowprocess function..

From: Fabian Hueske 
Sent: Monday, August 26, 2019 1:33 PM
To: Hanan Yehudai 
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

I would use a regular ProcessFunction, not a WindowProcessFunction.

The final WM depends on how the records were partitioned at the watermark 
assigner (and the assigner itself).
AFAIK, the distribution of files to source reader tasks is not deterministic. 
Hence, the final WM changes from run to run.

Fabian

Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
You said “ You can use a custom ProcessFunction and compare the timestamp of 
each record with the current watermark.”.

Does the  window  process function has all the events – even the ones that are 
dropped due to lateness?
from what I’m understand the “ iterable”  argument I contains the record that 
were inserted into the window  and NOT the ones dropped.   Isn’t that correct ?


Also,
when looking on Flink’s monitoring page  - for  the  watermarks  I see 
different vales  even after all my files were processed.  Which is something I 
would not expect
I would expect that eventually   the WM will be the highest EVENT_TIME on my 
set of files..


thanks

From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, August 26, 2019 12:38 PM
To: Hanan Yehudai mailto:hanan.yehu...@radcom.com>>
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

Hi,

The paths of the files to read are distributed across all reader / source tasks 
and each task reads the files in order of their modification timestamp.
The watermark generator is not aware of any files and just looks at the stream 
of records produced by the source tasks.
You need to chose the WM generator strategy such that you minimize the number 
of late records.

I'd recommend to first investigate how many late records you are dealing with.
You can use a custom ProcessFunction and compare the timestamp of each record 
with the current watermark.

AllowedLateness is also not a magical cure. It will just emit updates 
downstream, i.e., you need to remove the results that were updated by a more 
complete result.

Best, Fabian

Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
The data  source is generated by an application that monitors some sort of 
sessions.
With the EVENT_TIME column being the session end time .

It is possible that the files will have out of order data , because of the 
async nature of the application writing  files.
 While the EVENT_TIME is monotonically  increasing in general .  some lateness 
is possible. However ,I used allowlateness  on my stream and still got the 
inconsistencies

Although the real life use case is generically reading files form a folder.  
The testing  env has an already set of files in advanced -  these  should be 
read and produce the result.

You mentioned the “right” order of the files.  Is it sorted by update time ?  
when running in parallel, is it possible that 2 files will be read in parallel. 
And in case that the latter one is smaller.  The latest timestamp will  be 
handled first ?


BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window is 
calculated ?  and got the processing to trigger multiple times  so I’m not sure 
exactly how this type of trigger works..

Thanks




From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, August 26, 2019 11:06 AM
To: Hanan Yehudai mailto:hanan.yehu...@radcom.com>>
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

Hi,

Can you share a few more details about the data source?
Are you continuously ingesting files from a folder?

You are correct, that the parallelism should not affect the results, but there 
are a few things that can affect that:
1) non-determnistic keys
2) out-of-order data with inappropriate watermarks

Note that watermark configuration for file ingests can be difficult and that 
you need to ensure that files are read in the "right" order.
AFAIK, Flink's continuous file source uses the modification timestamp of files 
to determine the read order.

Best, Fabian

Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?



Re: FLINK WEEKLY 2019/35

2019-09-02 Thread Jark Wu
很棒的总结。 多谢 Zili.



> 在 2019年9月2日,11:18,Zili Chen  写道:
> 
> FLINK WEEKLY 2019/35 
> 
> FLINK 社区正在如火如荼的开发 1.10 的新特性中,许多对 FLINK
> 现有局限的讨论,包括功能上的、配置上的和文档上的问题都在热烈的讨论中。上周,user-zh
> 列表活跃度大大增加,社区的开发者和使用者对用户的问题的回复也非常迅速,FLINK 中文社区的壮大有目共睹。本周仍然分为用户列表的问答,FLINK
> 开发的进展和社区事件三个部分为大家推送上周的 FLINK 社区新闻。
> USER
> 
> flink 1.9 消费kafka报错
> 
> 
> 实际问题是使用 BLINK planner 的问题,阿里的开发者介绍了使用 BLINK planner 的姿势。
> 
> flink1.9 blink planner table ddl 使用问题
> 
> flink1.9
> Blink planner create view 问题
> 
> 
> 同样是 BLINK planner 的使用姿势问题。
> 
> 关于elasticSearch table sink 构造过于复杂
> 
> 
> 查询结果输出到 ES sink 的连接方式。
> 
> 关于flink状态后端使用Rocksdb序列化问题
> 
> 
> 升级到 FLINK 1.8 使用 POJO Scheme Evolution 支持状态模式演化。
> 
> Checkpoint使用
> 
> 
> 作业从 Checkpoint 而不是 Savepoint 中恢复的方式,恢复时可以在一定程度上调整并行度。
> 
> FLINK 1.9 Docker 镜像 
> 
> FLINK 1.9 Docker 镜像已经发布,包括 Scala 2.11 和 2.12 的支持版本。
> 
> How can TMs distribute evenly over Flink on YARN cluster?
> 
> 
> FLINK 目前无法保证在 YARN 上起作业的时候 TM 尽量分配到不同的节点上。
> 
> type error with generics
> 
> 
> FLINK Java API 使用时有时需要手动添加类型信息,在 Scala 的情况下由于有 implicit 所以有时候两种 API 的表现很不相同。
> 
> Re: Flink operators for Kubernetes
> 
> 
> k8s 上的 FLINK operator 已经由 Apache Beam 社区的成员开发出来了,有 FLINK on k8s 需求的同学可以尝试使用。
> 
> Is there Go client for Flink?
> 
> 
> 目前 FLINK 只有 Java Client 和 REST API,使用 Go 的用户可以通过 REST API 来控制 FLINK
> 作业的提交和监控。
> 
> How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?
> 
> 
> FLINK 大作业包含大的 uberjar 的情况下的最佳实践,主要受限于 FLINK Resource Manager
> 的一些缺陷。阿里和腾讯的开发者都分享了自己处理大作业大包的方案。
> DEV
> 
> [DISCUSS] FLIP-57 - Rework FunctionCatalog
> 
> 
> Bowen Li 的 FLIP-57 旨在提供更好的 FLINK SQL 的开发和编写体验。
> 
> [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation
> 
> 
> Timo Walther 的 FLIP-60 旨在将 Table API & SQL 的文档从原来附属于 DataStream API
> 的情况提升为第一层级的文档。FLINK 的用户很多都通过编写 SQL 来实现自己的作业,文档的提升有助于改善用户开发时查阅相关信息的体验。
> 
> [DISCUSS] FLIP-59: Enable execution configuration from Configuration object
> 
> 
> Dawid Wysakowicz 的 FLIP-59 与 FLIP-54 关系紧密,都是着重在改善 FLINK 的配置情况。目前,FLINK 的
> execution configuration 只能在编写程序的时候从程序中设置,与其他许多配置可以通过配置文件或命令行参数等方法传递不一致。
> 
> [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration
> 
> 
> Till Rohrmann 发起了简化 FLINK 集群级别重启策略配置的讨论,目前 FLINK
> 的重启策略配置在演化过程中变得很复杂,主要是除了推荐的 restart-strategy 配置外还有非常多的默认行为。
> 
> Re: [DISCUSS] Flink client api enhancement for downstream project
> 
> 
> Kostas Kloudas 更新了 Client API 重构的进展,按照开发文档实现 JobClient 和多部署后端的 Executor
> 的原型已经在开发中。
> NEWS
> 
> [ANNOUNCE] Apache Flink-shaded 8.0 released
> 
> 
> Apache Flink-shaded 8.0 发布,Chesnay Schepler 是本次的 release manager,这个项目为
> FLINK 提供了 shaded 的依赖。
> 
> [DISCUSS] Releasing Flink 1.8.2
> 
> 
> jincheng sun 发起了 FLINK 1.8.2 的发布讨论,有望在近期发布 1.8.2 版本。
> 
> Best,

TaskManager process continue to work after termination

2019-09-02 Thread Ustinov Anton
Hello, I have a standalone cluster setup with Flink 1.8. Task manager processes 
configured via systemd units with the always restart policy. An error occurred 
during execution of the JobGraph and caused termination of the task manager. 
Logs from task manager:

{"time":"2019-09-02 
11:33:14.797","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Source:
 Custom Source -> Filter (7/8) (f6ba0f0040fa578a15a3d71396281a6e) switched from 
RUNNING to FAILED.","host":"clickstream-flink08"}
java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 22 more
{"time":"2019-09-02 
11:33:14.797","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL
 - exception in resource cleanup of task Source: Custom Source (2/8) 
(0d0fd38e421b5f2ac389303787ea1f54).","host":"clickstream-flink08"}
java.lang.IllegalStateException: Memory manager has been shut down.
at 
org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:828)
at java.lang.Thread.run(Thread.java:748)
{"time":"2019-09-02 
11:33:14.801","loglevel":"INFO","class":"org.apache.flink.runtime.taskmanager.Task","message":"Freeing
 task resources for Source: Custom Source -> Filter (7/8) 
(f6ba0f0040fa578a15a3d71396281a6e).","host":"clickstream-flink08"}
{"time":"2019-09-02 
11:33:14.801","loglevel":"ERROR","class":"org.apache.flink.runtime.taskmanager.Task","message":"FATAL
 - exception in resource cleanup 

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-09-02 Thread Yang Wang
Hi Dadashov,


Regarding your questions.


> Q1 Do all those 800 nodes download of batch of  3  at a time

The 800+ containers will be allocated on different yarn nodes. By default,
the LocalResourceVisibility is APPLICATION, so they will be downloaded only
once and shared for all taskmanager containers of a same application in the
same node. And the batch is not 3. Even the replica of your jars is 3(hdfs
blocks located on 3 different datanodes), a datanode could serve multiple
downloads. The limit is bandwidth of the datanode. I guess the bandwidth of
your hdfs datanode is not very good.So increase the replica of fat jar will
help to reduce the downloading time. And a JIRA ticket has been created.[1]


> Q2 What is the recommended way of handling 400MB+ Uberjar with 800+
containers ?

>From our online production experience, there are at least 3 optimization
ways.

   1. Increase the replica of jars in the yarn distributed cache.[1]
   2. Increase the container launch number or use NMClientAsync so that the
   allocated containers could be started asap. Even the startContainer in yarn
   nodemanager is asynchronous, launching container in
   FlinkYarnResourceManager is a blocking call. We have to start containers
   one by one.[2]
   3. Use yarn public cache to eliminate unnecessary jar downloading. Such
   as flink-dist.jar, it will not have to been uploaded ant then localized for
   each application.[3]


Unfortunately, the three features above are under developing. As a work
around, you could set dfs.replication=10 in the hdfs-site.xml of
HADOOP_CONF_DIR in the flink client machine.



[1].https://issues.apache.org/jira/browse/FLINK-12343

[2].https://issues.apache.org/jira/browse/FLINK-13184

[3].https://issues.apache.org/jira/browse/FLINK-13938



Best,

Yang

Zhu Zhu  于2019年9月2日周一 上午10:42写道:

> Hi Elkhan,
>
> >>Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
> >>We are intending to use Flink Real-time pipeline for Replay from
> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
> and real-time. So for batch Flink job, the ?>>containers will be released
> once the job is done.
> >>I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> This optimization is conducted by making flink dist jar a public
> distributed cache of YARN.
> In this way, the localized dist jar can be shared by different YARN
> applications and it will not be removed when the YARN application which
> localized it terminates.
> This requires some changes in Flink though.
> We will open a ISSUE to contribute this optimization to the community.
>
> Thanks,
> Zhu Zhu
>
> SHI Xiaogang  于2019年8月31日周六 下午12:57写道:
>
>> Hi Dadashov,
>>
>> You may have a look at method YarnResourceManager#onContainersAllocated
>> which will launch containers (via NMClient#startContainer) after containers
>> are allocated.
>> The launching is performed in the main thread of YarnResourceManager and
>> the launching is synchronous/blocking. Consequently, the containers will be
>> launched one by one.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:
>>
>>> Thanks  everyone for valuable input and sharing  your experience for
>>> tackling the issue.
>>>
>>> Regarding suggestions :
>>> - We provision some common jars in all cluster nodes  *-->*  but this
>>> requires dependence on Infra Team schedule for handling common jars/updating
>>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>>> size),  did not improve much. Only 100 containers could started in time.
>>> but then receiving :
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>> start container.
>>> This token is expired. current time is 1566422713305 found 1566422560552
>>> Note: System times on machines may be out of sync. Check system time and 
>>> time zones.
>>>
>>>
>>> - It would be nice to see FLINK-13184
>>>  , but expected
>>> version that will get in is 1.10
>>> - Increase replication factor --> It would be nice to have Flink conf
>>> for setting replication factor for only Fink job jars, but not the output.
>>> It is also challenging to set a replication for yet non-existing directory,
>>> the new files will have default replication factor. Will explore HDFS cache
>>> option.
>>>
>>> Maybe another option can be:
>>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>>> jars from already started TaskManagers  in P2P fashion, not to have a
>>> blocker on HDFS replication.
>>>
>>> Spark job without any tuning exact same size jar with 800 executors, can
>>> start without any issue at the same cluster in less than a minute.
>>>
>>> *Further questions:*
>>>
>>> *@ SHI Xiaogang > :*
>>>
>>> I see that all 800 requests are sent concurrently :
>>>
>>> 2019-08-30 00:28:28.516 

Re: Assigning UID to Flink SQL queries

2019-09-02 Thread Dawid Wysakowicz
Hi Yuval,

Unfortunately currently you cannot assign UIDs in table programs. The
reason is that uid is used for reassigning state upon restart. Table
programs are automatically compiled to executable program. This program
might change depending on many factors: table statistics, applied rules
etc. As of know it can also change between Flink version if we implement
new optimizations, operators etc. Right now user also does not have
access to the execution graph.

This is though something that we definitely want to address at some point.

Best,

Dawid

On 30/08/2019 12:51, Yuval Itzchakov wrote:
> Anyone?
>
> On Tue, 27 Aug 2019, 17:23 Yuval Itzchakov,  > wrote:
>
> Hi,
>
> We a have a bunch of Flink SQL queries running in our Flink
> environment. For
> regular Table API interactions, we can override `uid` which also
> gives us an
> indicative name for the thread/UI to look at. For Flink SQL
> queries, this
> doesn't seem the the case which results in very verbose names
> (essentially
> the entire query plan) shown in the UI / thread names while debugging.
>
> Is there any convenient way to set / override the UID for SQL defined
> queries?
>
> -- 
> Best Regards,
> Yuval Itzchakov.
>


signature.asc
Description: OpenPGP digital signature