Re: How to use Lo-level Joins API

2019-08-06 Thread Yuta Morisawa

Hi Victor

> Is it possible stream1 and stream2 don't have common keys?  You may 
verify this by logging out the key of current processed element.


I misunderstood the usage.
I thought stream1 and 2 have different contexts and they can access both 
state stores each other.
But actually, processElement1 and 2 must have the same key to access the 
same state store.


Now I can join two streams successfully.
Thank you your comment.

Best,
Yuta



On 2019/08/07 13:49, Victor Wong wrote:

Hi Yuta,


I made sure the 'ValueState data' has data from stream1 with the IDE

debugger but in spite of that, processElement2 can't access it.

Since `processElement1` and `processElement2`  use the same `Context`, I think 
there is no state access issue.
Is it possible stream1 and stream2 don't have common keys?  You may verify this 
by logging out the key of current processed element.

Best,
Victor

On 2019/8/7, 10:56 AM, "Yuta Morisawa"  wrote:

 Hi Yun
 
 Thank you for replying.

  >Have you set a default value for the state ?
 Actually, the constructor of the ValueStateDescriptor with default value
 is deprecated so I don't set it.
 
 The problem occurs when the stream1 comes first.

 I made sure the 'ValueState data' has data from stream1 with the IDE
 debugger but in spite of that, processElement2 can't access it.
 
 On 2019/08/07 11:43, Yun Gao wrote:

 > Hi Yuta,
 >Have you set a default value for the state ? If the state did not
 > have a default value and the records from stream2 comes first for a
 > specific key, then the state would never be set with a value, thus the
 > return value will be null.
 >
 > Best,
 > Yun
 >
 >
 > --
 > From:Yuta Morisawa 
 > Send Time:2019 Aug. 7 (Wed.) 08:56
 > To:user 
 > Subject:How to use Lo-level Joins API
 >
 > Hi
 >
 > I am trying to use low-level joins.
 > According to the doc, the way is creating a state and access it from
 > both streams, but I can't.
 >
 > 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
 >
 > This is a snippet of my code.
 > It seems that the processElement1,2 have different ValueStates so 
that
 > v1 in processElement2 is always null.
 >
 > ---
 > stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
 >
 > public class MyCPF extends CoProcessFunction{
 >ValueState data;
 >
 >processElement1(v1){
 >  data.update(v1);
 >}
 >
 >processElement2(v2){
 >  v1 = data.value() // v1 is always null
 >  out.collect(v1 + v2)
 >}
 >
 >open(){
 >  data = getRuntimeContext().getState(descriptor);
 >}
 >
 > }
 > ---
 >
 > Can you tell me the collect way of the low-level joins and send me a
 > sample code if you have?
 >
 > --
 > Thank you
 > Yuta
 >
 >
 
 --

 
 
 Challenge for the future 豊かな未来への挑戦
 
 Tomorrow, Together  KDDI
 -
〒356-8502
  埼玉県ふじみ野市大原2丁目1番15号
  株式会社 KDDI総合研究所(KDDI Research, Inc.)
  コネクティッドカー1G
  森澤 雄太
  mail yu-moris...@kddi-research.jp
  tel  070-3871-8883
 
  この電子メールおよび添付書類は、名宛人のための
  特別な秘密情報を含んでおります。
  そのため、名宛人以外の方による利用は認められて
  おりません。
  名宛人以外の方による通信内容公表、複写、転用等
  は厳禁であり、違法となることがあります。
  万が一、何らかの誤りによりこの電子メールを名宛
  人以外の方が受信された場合は、お手数でも、直ち
  に発信人にお知らせ頂くと同時に、当メールを削除
  下さいますようお願い申し上げます。
 
 



--


Challenge for the future 豊かな未来への挑戦

Tomorrow, Together  KDDI
-
  〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail yu-moris...@kddi-research.jp
 tel  070-3871-8883

 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。



Configure Prometheus Exporter

2019-08-06 Thread Chaoran Yu
Hello guys,

   Does anyone know if the Prometheus metrics exported via the JMX reporter
or the Prometheus reporter can be configured using a YAML file similar to
this one
?
If there is such support in Flink, how do I tell Flink the path to my YAML
configuration file (e.g. maybe through a setting in flink-conf)?

Thanks,
Chaoran


Re: How to use Lo-level Joins API

2019-08-06 Thread Victor Wong
Hi Yuta,

> I made sure the 'ValueState data' has data from stream1 with the IDE
debugger but in spite of that, processElement2 can't access it.

Since `processElement1` and `processElement2`  use the same `Context`, I think 
there is no state access issue.
Is it possible stream1 and stream2 don't have common keys?  You may verify this 
by logging out the key of current processed element.

Best,
Victor

On 2019/8/7, 10:56 AM, "Yuta Morisawa"  wrote:

Hi Yun

Thank you for replying.
 >Have you set a default value for the state ?
Actually, the constructor of the ValueStateDescriptor with default value 
is deprecated so I don't set it.

The problem occurs when the stream1 comes first.
I made sure the 'ValueState data' has data from stream1 with the IDE 
debugger but in spite of that, processElement2 can't access it.

On 2019/08/07 11:43, Yun Gao wrote:
> Hi Yuta,
>Have you set a default value for the state ? If the state did not 
> have a default value and the records from stream2 comes first for a 
> specific key, then the state would never be set with a value, thus the 
> return value will be null.
> 
> Best,
> Yun
> 
> 
> --
> From:Yuta Morisawa 
> Send Time:2019 Aug. 7 (Wed.) 08:56
> To:user 
> Subject:How to use Lo-level Joins API
> 
> Hi
> 
> I am trying to use low-level joins.
> According to the doc, the way is creating a state and access it from
> both streams, but I can't.
> 
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
> 
> This is a snippet of my code.
> It seems that the processElement1,2 have different ValueStates so that
> v1 in processElement2 is always null.
> 
> ---
> stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
> 
> public class MyCPF extends CoProcessFunction{
>ValueState data;
> 
>processElement1(v1){
>  data.update(v1);
>}
> 
>processElement2(v2){
>  v1 = data.value() // v1 is always null
>  out.collect(v1 + v2)
>}
> 
>open(){
>  data = getRuntimeContext().getState(descriptor);
>}
> 
> }
> ---
> 
> Can you tell me the collect way of the low-level joins and send me a
> sample code if you have?
> 
> --
> Thank you
> Yuta
> 
> 

-- 


Challenge for the future 豊かな未来への挑戦

Tomorrow, Together  KDDI
-
   〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail yu-moris...@kddi-research.jp
 tel  070-3871-8883

 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。





Re: How to use Lo-level Joins API

2019-08-06 Thread Yuta Morisawa

Hi Yun

Thank you for replying.
>Have you set a default value for the state ?
Actually, the constructor of the ValueStateDescriptor with default value 
is deprecated so I don't set it.


The problem occurs when the stream1 comes first.
I made sure the 'ValueState data' has data from stream1 with the IDE 
debugger but in spite of that, processElement2 can't access it.


On 2019/08/07 11:43, Yun Gao wrote:

Hi Yuta,
       Have you set a default value for the state ? If the state did not 
have a default value and the records from stream2 comes first for a 
specific key, then the state would never be set with a value, thus the 
return value will be null.


Best,
Yun


--
From:Yuta Morisawa 
Send Time:2019 Aug. 7 (Wed.) 08:56
To:user 
Subject:How to use Lo-level Joins API

Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from
both streams, but I can't.


https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that
v1 in processElement2 is always null.

---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
   ValueState data;

   processElement1(v1){
 data.update(v1);
   }

   processElement2(v2){
 v1 = data.value() // v1 is always null
 out.collect(v1 + v2)
   }

   open(){
 data = getRuntimeContext().getState(descriptor);
   }

}
---

Can you tell me the collect way of the low-level joins and send me a
sample code if you have?

--
Thank you
Yuta




--


Challenge for the future 豊かな未来への挑戦

Tomorrow, Together  KDDI
-
  〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail yu-moris...@kddi-research.jp
 tel  070-3871-8883

 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。



Re: How to use Lo-level Joins API

2019-08-06 Thread Yun Gao
Hi Yuta,
  Have you set a default value for the state ? If the state did not have a 
default value and the records from stream2 comes first for a specific key, then 
the state would never be set with a value, thus the return value will be null.

Best,
Yun



--
From:Yuta Morisawa 
Send Time:2019 Aug. 7 (Wed.) 08:56
To:user 
Subject:How to use Lo-level Joins API

Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from 
both streams, but I can't.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that 
v1 in processElement2 is always null.

---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
  ValueState data;

  processElement1(v1){
data.update(v1);
  }

  processElement2(v2){
v1 = data.value() // v1 is always null
out.collect(v1 + v2)
  }

  open(){
data = getRuntimeContext().getState(descriptor);
  }

}
---

Can you tell me the collect way of the low-level joins and send me a 
sample code if you have?

--
Thank you
Yuta



How to use Lo-level Joins API

2019-08-06 Thread Yuta Morisawa

Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from 
both streams, but I can't.


https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that 
v1 in processElement2 is always null.


---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
 ValueState data;

 processElement1(v1){
   data.update(v1);
 }

 processElement2(v2){
   v1 = data.value() // v1 is always null
   out.collect(v1 + v2)
 }

 open(){
   data = getRuntimeContext().getState(descriptor);
 }

}
---

Can you tell me the collect way of the low-level joins and send me a 
sample code if you have?


--
Thank you
Yuta



Re: From Kafka Stream to Flink

2019-08-06 Thread Maatary Okouya
Fabian,

ultimately, i just want to perform a join on the last values for each keys.

On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya 
wrote:

> Fabian,
>
> could you please clarify the following statement:
>
> However joining an append-only table with this view without adding
> temporal join condition, means that the stream is fully materialized as
> state.
> This is because previously emitted results must be updated when the view
> changes.
> It really depends on the semantics of the join and query that you need,
> how much state the query will need to maintain.
>
>
> I am not sure to understand the problem. If i have to append-only table
> and perform some join on it, what's the issue ?
>
>
> On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya 
> wrote:
>
>> Thank you for the clarification. Really appreciated.
>>
>> Is Last_val part of the API ?
>>
>> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink does not distinguish between streams and tables. For the Table API
>>> / SQL, there are only tables that are changing over time, i.e., dynamic
>>> tables.
>>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>>> append-only changes, i.e., records are only inserted and never deleted or
>>> modified.
>>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>>> upsert and delete changes, i.e., the table has a unique key and records are
>>> inserted, deleted, or updated per key.
>>>
>>> In the current version, Flink does not have native support to ingest an
>>> upsert stream as a dynamic table (right now only append-only tables can be
>>> ingested, native support for upsert tables will be added soon.).
>>> However, you can create a view with the following SQL query on an
>>> append-only table that creates an upsert table:
>>>
>>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>>> FROM appendOnlyTable
>>> GROUP BY key
>>>
>>> Given, this view, you can run all kinds of SQL queries on it.
>>> However joining an append-only table with this view without adding
>>> temporal join condition, means that the stream is fully materialized as
>>> state.
>>> This is because previously emitted results must be updated when the view
>>> changes.
>>> It really depends on the semantics of the join and query that you need,
>>> how much state the query will need to maintain.
>>>
>>> An alternative to using Table API / SQL and it's dynamic table
>>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>>> These APIs are more low level and expose access to state and timers,
>>> which are the core ingredients for stream processing.
>>> You can implement pretty much all logic of KStreams and more in these
>>> APIs.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>>> maatarioko...@gmail.com>:
>>>
 I would like to have a KTable, or maybe in Flink term a dynamic Table,
 that only contains the latest value for each keyed record. This would allow
 me to perform aggregation and join, based on the latest state of every
 record, as opposed to every record over time, or a period of time.

 On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:

> Can you elaborate more  about your use case .
>
>
> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <
> maatarioko...@gmail.com> wrote:
>
>> Hi,
>>
>> I am a user of Kafka Stream so far. However, because i have been face
>> with several limitation in particular in performing Join on KTable.
>>
>> I was wondering what is the appraoch in Flink to achieve  (1) the
>> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
>> latest version of all keyed records,  and (2) joining those.
>>
>> There are currently a lot of limitation around that on Kafka Stream,
>> and i need that for performing some ETL process, where i need to mirror
>> entire databases in Kafka, and then do some join on the table to emit the
>> logical entity in Kafka Topics. I was hoping that somehow i could acheive
>> that by using FLink as intermediary.
>>
>> I can see that you support any kind of join, but i just don't see the
>> notion of Ktable.
>>
>>
>>


Re: From Kafka Stream to Flink

2019-08-06 Thread Maatary Okouya
Fabian,

could you please clarify the following statement:

However joining an append-only table with this view without adding temporal
join condition, means that the stream is fully materialized as state.
This is because previously emitted results must be updated when the view
changes.
It really depends on the semantics of the join and query that you need, how
much state the query will need to maintain.


I am not sure to understand the problem. If i have to append-only table and
perform some join on it, what's the issue ?


On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya 
wrote:

> Thank you for the clarification. Really appreciated.
>
> Is Last_val part of the API ?
>
> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink does not distinguish between streams and tables. For the Table API
>> / SQL, there are only tables that are changing over time, i.e., dynamic
>> tables.
>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
>> append-only changes, i.e., records are only inserted and never deleted or
>> modified.
>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
>> upsert and delete changes, i.e., the table has a unique key and records are
>> inserted, deleted, or updated per key.
>>
>> In the current version, Flink does not have native support to ingest an
>> upsert stream as a dynamic table (right now only append-only tables can be
>> ingested, native support for upsert tables will be added soon.).
>> However, you can create a view with the following SQL query on an
>> append-only table that creates an upsert table:
>>
>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
>> FROM appendOnlyTable
>> GROUP BY key
>>
>> Given, this view, you can run all kinds of SQL queries on it.
>> However joining an append-only table with this view without adding
>> temporal join condition, means that the stream is fully materialized as
>> state.
>> This is because previously emitted results must be updated when the view
>> changes.
>> It really depends on the semantics of the join and query that you need,
>> how much state the query will need to maintain.
>>
>> An alternative to using Table API / SQL and it's dynamic table
>> abstraction is to use Flink's DataStream API and ProcessFunctions.
>> These APIs are more low level and expose access to state and timers,
>> which are the core ingredients for stream processing.
>> You can implement pretty much all logic of KStreams and more in these
>> APIs.
>>
>> Best, Fabian
>>
>>
>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
>> maatarioko...@gmail.com>:
>>
>>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>>> that only contains the latest value for each keyed record. This would allow
>>> me to perform aggregation and join, based on the latest state of every
>>> record, as opposed to every record over time, or a period of time.
>>>
>>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:
>>>
 Can you elaborate more  about your use case .


 On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya 
 wrote:

> Hi,
>
> I am a user of Kafka Stream so far. However, because i have been face
> with several limitation in particular in performing Join on KTable.
>
> I was wondering what is the appraoch in Flink to achieve  (1) the
> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
> latest version of all keyed records,  and (2) joining those.
>
> There are currently a lot of limitation around that on Kafka Stream,
> and i need that for performing some ETL process, where i need to mirror
> entire databases in Kafka, and then do some join on the table to emit the
> logical entity in Kafka Topics. I was hoping that somehow i could acheive
> that by using FLink as intermediary.
>
> I can see that you support any kind of join, but i just don't see the
> notion of Ktable.
>
>
>


Re: From Kafka Stream to Flink

2019-08-06 Thread Maatary Okouya
Thank you for the clarification. Really appreciated.

Is Last_val part of the API ?

On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske  wrote:

> Hi,
>
> Flink does not distinguish between streams and tables. For the Table API /
> SQL, there are only tables that are changing over time, i.e., dynamic
> tables.
> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with
> append-only changes, i.e., records are only inserted and never deleted or
> modified.
> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has
> upsert and delete changes, i.e., the table has a unique key and records are
> inserted, deleted, or updated per key.
>
> In the current version, Flink does not have native support to ingest an
> upsert stream as a dynamic table (right now only append-only tables can be
> ingested, native support for upsert tables will be added soon.).
> However, you can create a view with the following SQL query on an
> append-only table that creates an upsert table:
>
> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ...
> FROM appendOnlyTable
> GROUP BY key
>
> Given, this view, you can run all kinds of SQL queries on it.
> However joining an append-only table with this view without adding
> temporal join condition, means that the stream is fully materialized as
> state.
> This is because previously emitted results must be updated when the view
> changes.
> It really depends on the semantics of the join and query that you need,
> how much state the query will need to maintain.
>
> An alternative to using Table API / SQL and it's dynamic table abstraction
> is to use Flink's DataStream API and ProcessFunctions.
> These APIs are more low level and expose access to state and timers, which
> are the core ingredients for stream processing.
> You can implement pretty much all logic of KStreams and more in these APIs.
>
> Best, Fabian
>
>
> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya <
> maatarioko...@gmail.com>:
>
>> I would like to have a KTable, or maybe in Flink term a dynamic Table,
>> that only contains the latest value for each keyed record. This would allow
>> me to perform aggregation and join, based on the latest state of every
>> record, as opposed to every record over time, or a period of time.
>>
>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat  wrote:
>>
>>> Can you elaborate more  about your use case .
>>>
>>>
>>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya 
>>> wrote:
>>>
 Hi,

 I am a user of Kafka Stream so far. However, because i have been face
 with several limitation in particular in performing Join on KTable.

 I was wondering what is the appraoch in Flink to achieve  (1) the
 concept of KTable, i.e. a Table that represent a changeLog, i.e. only the
 latest version of all keyed records,  and (2) joining those.

 There are currently a lot of limitation around that on Kafka Stream,
 and i need that for performing some ETL process, where i need to mirror
 entire databases in Kafka, and then do some join on the table to emit the
 logical entity in Kafka Topics. I was hoping that somehow i could acheive
 that by using FLink as intermediary.

 I can see that you support any kind of join, but i just don't see the
 notion of Ktable.





how to get the code produced by Flink Code Generator

2019-08-06 Thread Vincent Cai
Hi Users,
In Spark, we can invoke  Dataset method "queryExecution.debug.codegen()" to get 
the code produced by Catalyst. 
is there any similar api in Flink? 


reference link : 
https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0  







Regards
Vincent  Cai

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread zhijiang
Hi paul,

In theory broadcast operator could not be chained for all-to-all mode, and 
chain is only feasible for one-to-one mode like forward. 
If chain, the next operator could process the raw record emitted by head 
operator directly. But if not, the emitted record must be serialized into 
buffer which could be consumed by the dowstream op via network ornot. So the 
chain way has the best performance in theory compared to non-chain.

In your case, if you could not bypass the requirements of broadcast, then you 
have to face the non-chain way and test whether the real performance is within 
your acception or not. If the performance is not reaching your requirements, we 
could further consider other improvements.

Best,
Zhijiang
--
From:Piotr Nowojski 
Send Time:2019年8月6日(星期二) 14:55
To:黄兆鹏 
Cc:user 
Subject:Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏  wrote:
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏  wrote:
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^ 
  |   
BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 

Thanks!
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek 

> On 6 Aug 2019, at 09:56, 黄兆鹏  wrote:
> 
> Hi all, 
> My flink job has dy

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread Wong Victor
Oops, accidentally sent the email.
The good news is that you don’t have to checkpoint the state of the Kafka 
consumers.

From: Wong Victor 
Date: Tuesday, August 6, 2019 at 11:31 PM
To: Piotr Nowojski , 黄兆鹏 
Cc: user 
Subject: Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,
If the performance impact of braking the operator chain is huge, maybe you can 
read the latest schema from Kafka within the operators.

It’s a little complicated, you have to start a Kafka consumer in e.g. ` 
RichFunction#open()` and reading from (the largest offset – 1), and handle new 
messages coming in.
The good news

From: Piotr Nowojski 
Date: Tuesday, August 6, 2019 at 8:55 PM
To: 黄兆鹏 
Cc: user 
Subject: Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek



On 6 Aug 2019, at 13:11, 黄兆鹏 
mailto:paulhu...@easyops.cn>> wrote:

Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it.
In this case is there anything I have missed?

Thank you!






-- Original --
From:  "Piotr Nowojski"mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"mailto:paulhu...@easyops.cn>>;
Cc:  "user"mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek



On 6 Aug 2019, at 10:50, 黄兆鹏 
mailto:paulhu...@easyops.cn>> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^
  |
BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining?

Thanks!



-- Original --
From:  "Piotr Nowojski"mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"mailto:paulhu...@easyops.cn>>;
Cc:  "user"mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread Wong Victor
Hi,
If the performance impact of braking the operator chain is huge, maybe you can 
read the latest schema from Kafka within the operators.

It’s a little complicated, you have to start a Kafka consumer in e.g. ` 
RichFunction#open()` and reading from (the largest offset – 1), and handle new 
messages coming in.
The good news

From: Piotr Nowojski 
Date: Tuesday, August 6, 2019 at 8:55 PM
To: 黄兆鹏 
Cc: user 
Subject: Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek


On 6 Aug 2019, at 13:11, 黄兆鹏 
mailto:paulhu...@easyops.cn>> wrote:

Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it.
In this case is there anything I have missed?

Thank you!





-- Original --
From:  "Piotr Nowojski"mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"mailto:paulhu...@easyops.cn>>;
Cc:  "user"mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek


On 6 Aug 2019, at 10:50, 黄兆鹏 
mailto:paulhu...@easyops.cn>> wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^
  |
BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining?

Thanks!



-- Original --
From:  "Piotr Nowojski"mailto:pi...@ververica.com>>;
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"mailto:paulhu...@easyops.cn>>;
Cc:  "user"mailto:user@flink.apache.org>>;
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek

> On 6 Aug 2019, at 09:56, 黄兆鹏 
> mailto:paulhu...@easyops.cn>> wrote:
>
> Hi all,
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
>
> For example, the two streams just like this:
> OperatorA  ->  Oper

Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

2019-08-06 Thread Jungtaek Lim
Thanks Fabian on providing great input!

Regarding your feedback on solution, yes you're right I realized I missed
out-of-order events, and as you said we have to "split" existing window
into two which current abstraction of custom window couldn't help here.
(Flink would have no idea how aggregated events/intermediate values - state
- should be assigned to new windows. Unlike merging window, it should be
dealt manually.) Trigger approach might be still viable but it should deal
with current watermark as well.

Honestly the purpose of experiment was to determine the power of leveraging
custom window API and custom triggers (more abstraction), rather than
dealing with ProcessFunction (lower level, flatMapGroupsWithState in
Spark's case). If we end up with dealing most of things on ProcessFunction,
then less merits for additional complexity on abstraction. I guess I just
couldn't find proper use cases on this.

Btw, I'd like to thank, I'm learning Flink with the new book "Stream
Processing with Apache Flink". :) Thanks for your amazing efforts on
publishing nice book!

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Aug 5, 2019 at 10:21 PM Fabian Hueske  wrote:

> Hi Jungtaek,
>
> I would recommend to implement the logic in a ProcessFunction and avoid
> Flink's windowing API.
> IMO, the windowing API is difficult to use, because there are many pieces
> like WindowAssigner, Window, Trigger, Evictor, WindowFunction that are
> orchestrated by Flink.
> This makes it very hard to understand what exactly is going on and to
> ensure that no state is leaked.
>
> For example, I think your solution is not 100% correct, because a
> MergingWindowAssigner lacks the ability to split a window.
> In case of out-of-order events, you might have the situation that a LOG
> OUT event for 12:00:00 arrives after a game event for 12:00:01 was assigned
> to a window.
> In that case, you'd need to split the window and the game event at
> 12:00:01 would need to go to the next session window.
>
> As I said, I would use a ProcessFunction because it is a single function
> and provides access to state and timers which is all you need.
> The logic you would need to implement would be a bit more, but it would be
> much easier to reason about how the data is processed.
>
> Best,
> Fabian
>
>
> Am Mo., 5. Aug. 2019 um 05:18 Uhr schrieb Jungtaek Lim  >:
>
>> Thanks Dongwon to provide feedback and share your approach!
>>
>> I'm not sure it could be possible (not an expert), but if we could reset
>> intermediate result (aggregated) after processing "fire event", I guess it
>> would work as expected, as window would still expand even after "session
>> end", but it will provide same effect in point of "outputs". Nice approach!
>> I'll play with this approach too.
>>
>> Thanks again,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim 
>> wrote:
>>
>>> Hi Jungtaek,
>>>
>>> I've faced a similar problem in the past; we need to calculate an
>>> aggregate upon receiving an end message from each user.
>>>
>>> While you're trying to solve problem by defining a custom window
>>> assigner, I took a different approach to the problem by implementing a
>>> custom trigger.
>>>
>>> You can see my implementation in the following link (but I'm not quite
>>> sure if my implementation could solve your case):
>>>
>>> https://github.com/eastcirclek/flink-examples/blob/master/src/main/scala/com/github/eastcirclek/examples/customtrigger/trigger/TrackingEarlyResultEventTimeTrigger.scala
>>>
>>> Best,
>>> Dongwon
>>>
>>> p.s. FYI, I presented the background of the problem and the general idea
>>> last year at FlinkForward 2017 Berlin. Hope this presentation helps you:
>>> https://www.youtube.com/watch?v=wPQWFy5JENw
>>>
>>>
>>>
>>> On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim  wrote:
>>>
 Hi Flink users,

 I've been spending time to learn and play with Flink DataStream API,
 not an expert level but as a beginner. :)

 To play with custom window API, I just created a small example, session
 window based on fixed time gap, but indicate the type of event which may
 contain "end of session". I guess it's not unusual to see this kind of
 things (like manual logout and login) though I don't have concrete real use
 case.

 This is an implementation based on Flink DataStream API:
 https://gist.github.com/HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0

 Custom window works pretty well and I could leverage side output very
 easily. One thing leading the code a bit longer was new definition of
 TimeWindow (to deal with event type of "close session"). Even though I
 tried to leverage TimeWindow via inheritance, the code goes a bit verbose
 as I need to implement a new Serializer as well.
 (Actually it required to implement a new Trigger as well, but took
 workaround to leverage existing EventTimeTrigger.)

 Assuming this pattern is not unusual (it would be pretty OK if it

Re: Best way to access a Flink state entry from another Flink application

2019-08-06 Thread Mohammad Hosseinian
Hi Oytun,

Thanks and good to know about your planned features.

BR, Moe


On 06/08/2019 16:14, Oytun Tez wrote:
Hi Mohammad,

Queryable State works in some cases: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html

As much as I know, this is the only way to access Flink's state from outside, 
until we have Savepoint API coming in 1.9.

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com — 
www.motaword.com


On Tue, Aug 6, 2019 at 9:52 AM Mohammad Hosseinian 
mailto:mohammad.hossein...@id1.de>> wrote:
Hi Alex,

Thanks for your reply. The application is streaming. The issue with using 
messaging channels for such kind of communication is the 'race condition'. I 
mean, when you have parallel channels of communication (one for the main flow 
of your streaming application and one for bringing 'stated/current' objects to 
desired processing nodes), then the order of messages are not preserved and it 
might lead to incorrect result of your application. That was the reason why I 
was wondering if there is any 'synchronous' way of accessing the Flink state.

BR, Moe


On 06/08/2019 13:25, Протченко Алексей wrote:
Hi Mohammad,

which types of applications do you mean? Streaming or batch ones? In terms of 
streaming ones queues like Kafka or RabbitMq between applications should be the 
best way I think.

Best regards,
Alex


Вторник, 6 августа 2019, 12:21 +02:00 от Mohammad Hosseinian 
:


Hi all,


We have a network of Flink applications. The whole cluster receives 
'state-update' messages from the outside, and there is one Flink application in 
our cluster that 'merges' these updates and creates the actual, most 
up-to-date, state of the 'data-objects' and passes it to the next process. It 
does this, using a stateful stream processing with a `KeyedProcessFunction` 
object. In our processing logic, there are nodes that require to access the 
actual state of the objects when they receive one or more 'object-id's from the 
previous Flink application. We do not propagate the actual-state of the objects 
since, not all types of the objects are relevant to all processes in the 
cluster, so we saved some network/storage overhead there.

The question is: for such scenario, what is the best way to expose the Flink 
state to another Flink application? I am aware of 'Queryable states', but I am 
not sure if this feature has been designed and is suitable for our use-case or 
not?


Thank you very much in advance.


BR, Moe

--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht


--
Протченко Алексей
--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht

--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht


Re: Best way to access a Flink state entry from another Flink application

2019-08-06 Thread Oytun Tez
Hi Mohammad,

Queryable State works in some cases:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html

As much as I know, this is the only way to access Flink's state from
outside, until we have Savepoint API coming in 1.9.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Aug 6, 2019 at 9:52 AM Mohammad Hosseinian <
mohammad.hossein...@id1.de> wrote:

> Hi Alex,
>
> Thanks for your reply. The application is streaming. The issue with using
> messaging channels for such kind of communication is the 'race condition'.
> I mean, when you have parallel channels of communication (one for the main
> flow of your streaming application and one for bringing 'stated/current'
> objects to desired processing nodes), then the order of messages are not
> preserved and it might lead to incorrect result of your application. That
> was the reason why I was wondering if there is any 'synchronous' way of
> accessing the Flink state.
>
> BR, Moe
>
>
> On 06/08/2019 13:25, Протченко Алексей wrote:
>
> Hi Mohammad,
>
> which types of applications do you mean? Streaming or batch ones? In terms
> of streaming ones queues like Kafka or RabbitMq between applications should
> be the best way I think.
>
> Best regards,
> Alex
>
>
> Вторник, 6 августа 2019, 12:21 +02:00 от Mohammad Hosseinian
>  :
>
> Hi all,
>
>
> We have a network of Flink applications. The whole cluster receives
> 'state-update' messages from the outside, and there is one Flink
> application in our cluster that 'merges' these updates and creates the
> actual, most up-to-date, state of the 'data-objects' and passes it to the
> next process. It does this, using a stateful stream processing with a
> `KeyedProcessFunction` object. In our processing logic, there are nodes
> that require to access the actual state of the objects when they receive
> one or more 'object-id's from the previous Flink application. We do not
> propagate the actual-state of the objects since, not all types of the
> objects are relevant to all processes in the cluster, so we saved some
> network/storage overhead there.
>
> The question is: for such scenario, what is the best way to expose the
> Flink state to another Flink application? I am aware of 'Queryable states',
> but I am not sure if this feature has been designed and is suitable for our
> use-case or not?
>
>
> Thank you very much in advance.
>
>
> BR, Moe
> --
>
> *Mohammad Hosseinian*
> Software Developer
> Information Design One AG
>
> Phone +49-69-244502-0
> Fax +49-69-244502-10
> Web *www.id1.de *
>
>
> Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main,
> Germany
> Registration: Amtsgericht Frankfurt am Main, HRB 52596
> Executive Board: Robert Peters, Benjamin Walther, Supervisory Board:
> Christian Hecht
>
>
>
> --
> Протченко Алексей
>
> --
>
> *Mohammad Hosseinian*
> Software Developer
> Information Design One AG
>
> Phone +49-69-244502-0
> Fax +49-69-244502-10
> Web *www.id1.de *
>
>
> Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main,
> Germany
> Registration: Amtsgericht Frankfurt am Main, HRB 52596
> Executive Board: Robert Peters, Benjamin Walther, Supervisory Board:
> Christian Hecht
>


Re: Best way to access a Flink state entry from another Flink application

2019-08-06 Thread Mohammad Hosseinian
Hi Alex,

Thanks for your reply. The application is streaming. The issue with using 
messaging channels for such kind of communication is the 'race condition'. I 
mean, when you have parallel channels of communication (one for the main flow 
of your streaming application and one for bringing 'stated/current' objects to 
desired processing nodes), then the order of messages are not preserved and it 
might lead to incorrect result of your application. That was the reason why I 
was wondering if there is any 'synchronous' way of accessing the Flink state.

BR, Moe


On 06/08/2019 13:25, Протченко Алексей wrote:
Hi Mohammad,

which types of applications do you mean? Streaming or batch ones? In terms of 
streaming ones queues like Kafka or RabbitMq between applications should be the 
best way I think.

Best regards,
Alex


Вторник, 6 августа 2019, 12:21 +02:00 от Mohammad Hosseinian 
:


Hi all,


We have a network of Flink applications. The whole cluster receives 
'state-update' messages from the outside, and there is one Flink application in 
our cluster that 'merges' these updates and creates the actual, most 
up-to-date, state of the 'data-objects' and passes it to the next process. It 
does this, using a stateful stream processing with a `KeyedProcessFunction` 
object. In our processing logic, there are nodes that require to access the 
actual state of the objects when they receive one or more 'object-id's from the 
previous Flink application. We do not propagate the actual-state of the objects 
since, not all types of the objects are relevant to all processes in the 
cluster, so we saved some network/storage overhead there.

The question is: for such scenario, what is the best way to expose the Flink 
state to another Flink application? I am aware of 'Queryable states', but I am 
not sure if this feature has been designed and is suitable for our use-case or 
not?


Thank you very much in advance.


BR, Moe

--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht


--
Протченко Алексей
--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht


Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread Piotr Nowojski
Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

> On 6 Aug 2019, at 13:11, 黄兆鹏  wrote:
> 
> Hi, Piotrek,
> I previously considered your first advice(use union record type), but I found 
> that the schema would be only sent to one subtask of the operator(for 
> example, operatorA), and other subtasks of the operator are not aware of it. 
> In this case is there anything I have missed? 
> 
> Thank you!
> 
> 
> 
> 
> 
>  
> -- Original --
> From:  "Piotr Nowojski";
> Date:  Tue, Aug 6, 2019 06:57 PM
> To:  "黄兆鹏";
> Cc:  "user";
> Subject:  Re: Will broadcast stream affect performance because of the absence 
> of operator chaining?
>  
> Hi,
> 
> Have you measured the performance impact of braking the operator chain?
> 
> This is a current limitation of Flink chaining, that if an operator has two 
> inputs, it can be chained to something else (only one input operators are 
> chained together). There are plans for the future to address this issue.
> 
> As a workaround, besides what you have mentioned:
> - maybe your record type can be a union: type of Record or Schema (not Record 
> AND Schema), and upstream operators (operatorA) could just ignore/forward the 
> Schema. You wouldn’t need to send schema with every record.
> - another (ugly) solution, is to implement BroadcastStream input outside of 
> Flink, but then you might have issues with checkpointing/watermarking and it 
> just makes many things more complicated.
> 
> Piotrek
> 
>> On 6 Aug 2019, at 10:50, 黄兆鹏 > > wrote:
>> 
>> Hi Piotrek,
>> Thanks for your reply, my broadcast stream just listen to the changes of the 
>> schema, and it's very infrequent and very lightweight.
>> 
>> In fact there are two ways to solve my problem,
>> 
>> the first one is a broadcast stream that listen to the change of the schema, 
>> and broadcast to every operator that will handle the data, just as I posted 
>> originally.
>> DataStream: OperatorA  ->  OperatorB  -> OperatorC
>>   ^   ^  ^
>>   |||
>>   BroadcastStream
>> 
>> the second approach is that I have an operator that will join my data and 
>> schema together and send to the downstream operators:
>>  DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
>>   ^ 
>>   |   
>> BroadcastStream   
>> 
>> 
>> The benefits of the first approach is that the flink job does not have to 
>> transfer the schema with the real data records among operators, because the 
>> schema will be broadcasted to each operator.
>> But the disadvantage of the first approache is that it breaks the operator 
>> chain, so operators may not be executed in the same slot and gain worse 
>> performance.
>> 
>> The second approach does not have the problem as the first one, but each 
>> message will carry its schema info among operators, it will cost about 2x 
>> for serialization and deserialization between operators.
>> 
>> Is there a better workaround that all the operators could notice the schema 
>> change and at the same time not breaking the operator chaining?
>> 
>> Thanks!
>> 
>>  
>>  
>> -- Original --
>> From:  "Piotr Nowojski"mailto:pi...@ververica.com>>;
>> Date:  Tue, Aug 6, 2019 04:23 PM
>> To:  "黄兆鹏"mailto:paulhu...@easyops.cn>>;
>> Cc:  "user"mailto:user@flink.apache.org>>;
>> Subject:  Re: Will broadcast stream affect performance because of the 
>> absence of operator chaining?
>>  
>> Hi,
>> 
>> Broadcasting will brake an operator chain. However my best guess is that 
>> Kafka source will be still a performance bottleneck in your job. Also 
>> Network exchanges add some measurable overhead only if your records are very 
>> lightweight and easy to process (for example if you are using RocksDB then 
>> you can just ignore network costs).
>> 
>> Either way, you can just try this out. Pre populate your Kafka topic with 
>> some significant number of messages, run both jobs, compare the throughput 
>> and decide based on those results wether this is ok for you or not.
>> 
>> Piotrek 
>> 
>> > On 6 Aug 2019, at 09:56, 黄兆鹏 > > > wrote:
>> > 
>> > Hi all, 
>> > My flink job has dynamic schema of data, so I want to consume a schema 
>> > kafka topic and try to broadcast to every operator so that each operator 
>> > could know what kind of data it is handling.
>> > 
>> > For example, the two streams just like this:
>> > OperatorA  ->  OperatorB  -> OperatorC
>> >   ^   ^  ^
>> >   ||   |
>> >BroadcastStream
>> > 
>> > If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC 
>> > are chai

Re: Restore state class not found exception in 1.8

2019-08-06 Thread Tzu-Li (Gordon) Tai
Hi Lasse,

I think the diagnosis here:
https://issues.apache.org/jira/browse/FLINK-13159 matches your problem.
This problem should be fixed in the next bugfix version for 1.8.x. We'll
also try to fix this for the upcoming 1.9.0 as well.

Cheers,
Gordon

On Mon, Jun 3, 2019 at 1:55 PM Lasse Nedergaard 
wrote:

> Hi Gordon
>
> To us it looks like the env.registerclass is needed when we write the save
> point. If we have an existing save point without the classes registered it
> doesn’t work.
>
> We have only seen the exception in our own sink that store pending data in
> operator state through CheckpointedFunction interface and this sink isn’t
> used in all our jobs.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 3. jun. 2019 kl. 12.50 skrev Tzu-Li (Gordon) Tai  >:
>
> Hi Lasse,
>
> This is indeed a bit odd. I'll need to reproduce this locally before I can
> figure out the root problem. Please bear with me for a while, will get back
> to you on this.
>
> Meanwhile, you mentioned that you only had some jobs failing with the
> posted exception. Did you figure out any more details on why this was only
> partially happening?
>
> Cheers,
> Gordon
>
> On Tue, May 28, 2019 at 8:59 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi Gordon
>>
>> We have found a solution but not why it happens on 1.8.
>> For it to work we need to call
>> Env.registertype(Reportmessage.class)
>>
>> Reportmessage extends ReportmessageBase and the state operator use
>> ReportmessageBase.
>> So we need to register all the class’s that extends a class used in
>> state. Don’t know why this is needed in 1.8
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 28. maj 2019 kl. 10.06 skrev Tzu-Li (Gordon) Tai > >:
>>
>> Hi Lasse,
>>
>> Did you move the class to a different namespace / package or changed to
>> be a nested class, across the Flink versions?
>> That would be the only cause I could reason about at the moment.
>>
>> If possible, could you also have a very minimal snippet / instructions on
>> how I can maybe reproduce this?
>> That might give me more insight.
>>
>> Cheers,
>> Gordon
>>
>> On Mon, May 27, 2019 at 7:52 PM Lasse Nedergaard <
>> lassenederga...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> When we restart some of our jobs from a savepoint we see the the
>>> exception below. It only happens for some of our jobs and we didn't see it
>>> in 1.7.2. The class Flink can't find differ from job to job and we are sure
>>> it's included in our Fat jar.
>>> As a side note we are on our way to use Avro instead of POJO, but are
>>> not there yet.
>>> If anyone have a clue what the root cause could be, and how to resolve
>>> it would be appreciated.
>>> Thanks in advance
>>>
>>> Lasse Nedergaard
>>>
>>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
>>> state backend for StreamSink_609b5f7fc746f29234b038c121356a9b_(2/2) from 
>>> any of the 1 provided restore options.
>>> at 
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>>> ... 5 more
>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
>>> when trying to restore operator state backend
>>> at 
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>>> at 
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>> at 
>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>> ... 7 more
>>> Caused by: java.lang.RuntimeExceptio

Re: Flink 1.8.1: Seeing {"errors":["Not found."]} when trying to access the Jobmanagers web interface

2019-08-06 Thread Ufuk Celebi
Hey Tobias,

out of curiosity: were you using the job/application cluster (as documented
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/docker.html#flink-job-cluster
)?

– Ufuk


On Tue, Aug 6, 2019 at 1:50 PM Kaymak, Tobias 
wrote:

> I was using Apache Beam and in the lib folder I had a JAR that was using
> Flink 1.7 in its POM. After bumping that to 1.8 it works :)
>
> On Tue, Aug 6, 2019 at 11:58 AM Kaymak, Tobias 
> wrote:
>
>> It completely works when using the docker image tag 1.7.2 - I just bumped
>> back and the web interface was there.
>>
>> On Tue, Aug 6, 2019 at 10:21 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> after upgrading the docker image from version 1.7.2  to 1.8.1 and wiping
>>> out zookeeper completely I see
>>>
>>> {"errors":["Not found."]}
>>>
>>> when trying to access the webinterface of Flink. I can launch jobs from
>>> the cmdline and I can't spot any error in the logs (so far on level INFO).
>>> I tried adding the flink-runtime-web_2.12-1.8.1.jar as a dependency
>>> into the lib folder when building the Docker container, but this did not
>>> help either.
>>>
>>> Has anyone experienced this problem? Is my Flink config faulty or what
>>> could be the reason?
>>>
>>


Re: Flink 1.8.1: Seeing {"errors":["Not found."]} when trying to access the Jobmanagers web interface

2019-08-06 Thread Kaymak, Tobias
I was using Apache Beam and in the lib folder I had a JAR that was using
Flink 1.7 in its POM. After bumping that to 1.8 it works :)

On Tue, Aug 6, 2019 at 11:58 AM Kaymak, Tobias 
wrote:

> It completely works when using the docker image tag 1.7.2 - I just bumped
> back and the web interface was there.
>
> On Tue, Aug 6, 2019 at 10:21 AM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> after upgrading the docker image from version 1.7.2  to 1.8.1 and wiping
>> out zookeeper completely I see
>>
>> {"errors":["Not found."]}
>>
>> when trying to access the webinterface of Flink. I can launch jobs from
>> the cmdline and I can't spot any error in the logs (so far on level INFO).
>> I tried adding the flink-runtime-web_2.12-1.8.1.jar as a dependency into
>> the lib folder when building the Docker container, but this did not help
>> either.
>>
>> Has anyone experienced this problem? Is my Flink config faulty or what
>> could be the reason?
>>
>


[bug ?] PrometheusPushGatewayReporter register more then one JM

2019-08-06 Thread miki haiat
We have  standalone cluster  with  PrometheusPushGatewayReporter
conflagration.
its seems like we cant register more then one JM to  Prometheus  because of
naming uniqueness.

 WARN  org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter  -
There was a problem registering metric numRunningJobs.
java.lang.IllegalArgumentException: *Collector already registered that
provides name: flink_jobmanager_numRunningJobs*


Re: Best way to access a Flink state entry from another Flink application

2019-08-06 Thread Протченко Алексей
Hi Mohammad,

which types of applications do you mean? Streaming or batch ones? In terms of 
streaming ones queues like Kafka or RabbitMq between applications should be the 
best way I think. 

Best regards,
Alex


>Вторник,  6 августа 2019, 12:21 +02:00 от Mohammad Hosseinian 
>:
>
>Hi all,
>
>We have a network of Flink applications. The whole cluster receives 
>'state-update' messages from the outside, and there is one Flink application 
>in our cluster that 'merges' these updates and creates the actual, most 
>up-to-date, state of the 'data-objects'
> and passes it to the next process. It does this, using a stateful stream 
> processing with a `KeyedProcessFunction` object. In our processing logic, 
> there are nodes that require to access the actual state of the objects when 
> they receive one or more 'object-id's
> from the previous Flink application. We do not propagate the actual-state of 
> the objects since, not all types of the objects are relevant to all processes 
> in the cluster, so we saved some network/storage overhead there.
>The question is: for such scenario, what is the best way to expose the Flink 
>state to another Flink application? I am aware of 'Queryable states', but I am 
>not sure if this feature has been designed and is suitable for our use-case or 
>not?
>
>Thank you very much in advance.
>
>BR, Moe
>-- 
>Mohammad Hosseinian
>Software Developer
>Information Design One AG
>
>Phone +49-69-244502-0
>Fax +49-69-244502-10
>Web www.id1.de
>
>
>Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
>Registration: Amtsgericht Frankfurt am Main, HRB 52596
>Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
>Hecht

-- 
Протченко Алексей


Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread 黄兆鹏
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 


Thank you!








 
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

 

Hi,

Have you measured the performance impact of braking the operator chain?


This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.


As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.


Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏  wrote:

Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.


In fact there are two ways to solve my problem,


the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream



the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^ 
  |   
BroadcastStream






The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.


The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.


Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 


Thanks!

 
 
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

 

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek 

> On 6 Aug 2019, at 09:56, 黄兆鹏  wrote:
> 
> Hi all, 
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
> 
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>   ^   ^  ^
>   ||   |
>BroadcastStream
> 
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> chained together in one slot because they have the same parallelism so that 
> it can gain maximum performance.
> 
> And I was wondering that if the broadcast stream exists, will it affect the 
> performance? Or flink will still chain them together to gain maximum 
> performance? 
> 
> Thanks!

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread Piotr Nowojski
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

> On 6 Aug 2019, at 10:50, 黄兆鹏  wrote:
> 
> Hi Piotrek,
> Thanks for your reply, my broadcast stream just listen to the changes of the 
> schema, and it's very infrequent and very lightweight.
> 
> In fact there are two ways to solve my problem,
> 
> the first one is a broadcast stream that listen to the change of the schema, 
> and broadcast to every operator that will handle the data, just as I posted 
> originally.
> DataStream: OperatorA  ->  OperatorB  -> OperatorC
>   ^   ^  ^
>   |||
>   BroadcastStream
> 
> the second approach is that I have an operator that will join my data and 
> schema together and send to the downstream operators:
>  DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
>   ^ 
>   |   
> BroadcastStream
> 
> 
> The benefits of the first approach is that the flink job does not have to 
> transfer the schema with the real data records among operators, because the 
> schema will be broadcasted to each operator.
> But the disadvantage of the first approache is that it breaks the operator 
> chain, so operators may not be executed in the same slot and gain worse 
> performance.
> 
> The second approach does not have the problem as the first one, but each 
> message will carry its schema info among operators, it will cost about 2x for 
> serialization and deserialization between operators.
> 
> Is there a better workaround that all the operators could notice the schema 
> change and at the same time not breaking the operator chaining? 
> 
> Thanks!
> 
>  
>  
> -- Original --
> From:  "Piotr Nowojski";
> Date:  Tue, Aug 6, 2019 04:23 PM
> To:  "黄兆鹏";
> Cc:  "user";
> Subject:  Re: Will broadcast stream affect performance because of the absence 
> of operator chaining?
>  
> Hi,
> 
> Broadcasting will brake an operator chain. However my best guess is that 
> Kafka source will be still a performance bottleneck in your job. Also Network 
> exchanges add some measurable overhead only if your records are very 
> lightweight and easy to process (for example if you are using RocksDB then 
> you can just ignore network costs).
> 
> Either way, you can just try this out. Pre populate your Kafka topic with 
> some significant number of messages, run both jobs, compare the throughput 
> and decide based on those results wether this is ok for you or not.
> 
> Piotrek 
> 
> > On 6 Aug 2019, at 09:56, 黄兆鹏  wrote:
> > 
> > Hi all, 
> > My flink job has dynamic schema of data, so I want to consume a schema 
> > kafka topic and try to broadcast to every operator so that each operator 
> > could know what kind of data it is handling.
> > 
> > For example, the two streams just like this:
> > OperatorA  ->  OperatorB  -> OperatorC
> >   ^   ^  ^
> >   ||   |
> >BroadcastStream
> > 
> > If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> > chained together in one slot because they have the same parallelism so that 
> > it can gain maximum performance.
> > 
> > And I was wondering that if the broadcast stream exists, will it affect the 
> > performance? Or flink will still chain them together to gain maximum 
> > performance? 
> > 
> > Thanks!



Best way to access a Flink state entry from another Flink application

2019-08-06 Thread Mohammad Hosseinian
Hi all,


We have a network of Flink applications. The whole cluster receives 
'state-update' messages from the outside, and there is one Flink application in 
our cluster that 'merges' these updates and creates the actual, most 
up-to-date, state of the 'data-objects' and passes it to the next process. It 
does this, using a stateful stream processing with a `KeyedProcessFunction` 
object. In our processing logic, there are nodes that require to access the 
actual state of the objects when they receive one or more 'object-id's from the 
previous Flink application. We do not propagate the actual-state of the objects 
since, not all types of the objects are relevant to all processes in the 
cluster, so we saved some network/storage overhead there.

The question is: for such scenario, what is the best way to expose the Flink 
state to another Flink application? I am aware of 'Queryable states', but I am 
not sure if this feature has been designed and is suitable for our use-case or 
not?


Thank you very much in advance.


BR, Moe

--

Mohammad Hosseinian
Software Developer
Information Design One AG

Phone +49-69-244502-0
Fax +49-69-244502-10
Web www.id1.de


Information Design One AG, Baseler Strasse 10, 60329 Frankfurt am Main, Germany
Registration: Amtsgericht Frankfurt am Main, HRB 52596
Executive Board: Robert Peters, Benjamin Walther, Supervisory Board: Christian 
Hecht


Re: Flink 1.8.1: Seeing {"errors":["Not found."]} when trying to access the Jobmanagers web interface

2019-08-06 Thread Kaymak, Tobias
It completely works when using the docker image tag 1.7.2 - I just bumped
back and the web interface was there.

On Tue, Aug 6, 2019 at 10:21 AM Kaymak, Tobias 
wrote:

> Hello,
>
> after upgrading the docker image from version 1.7.2  to 1.8.1 and wiping
> out zookeeper completely I see
>
> {"errors":["Not found."]}
>
> when trying to access the webinterface of Flink. I can launch jobs from
> the cmdline and I can't spot any error in the logs (so far on level INFO).
> I tried adding the flink-runtime-web_2.12-1.8.1.jar as a dependency into
> the lib folder when building the Docker container, but this did not help
> either.
>
> Has anyone experienced this problem? Is my Flink config faulty or what
> could be the reason?
>


Re:Pramaters in eclipse with Flink

2019-08-06 Thread Haibo Sun
Hi alaa.abutaha,


In fact, your problem is not related to Flink, but how to specify program 
parameters in Eclipse. I think the following document will help you.


https://www.cs.colostate.edu/helpdocs/cmd.pdf


Best,
Haibo


At 2019-07-26 22:02:48, "alaa"  wrote:
>Hallo 
> I run this example form GitHub 
>https://github.com/ScaleUnlimited/flink-streaming-kmeans
>
> but I am not familiar with eclipse and i got this error 
>
> 
>
>I dont know how and where i should put the following parameters:
>
>-local (to specify running Flink locally, versus on a real cluster)
>-input  (e.g.
>/path/to/flink-streaming-kmeans/src/test/resources/citibike-20180801-min.tsv)
>-accesstoken 
>-clusters  (5 or 10 are good values)
>-queryable (to enable calls to the API, on port 8085).
>
>Thank you
>
>
>
>--
>Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: getting an exception

2019-08-06 Thread Avi Levi
Yeap that was it (deploying 1.8.1 over 1.8.0 ) thanks !!!

On Mon, Aug 5, 2019 at 5:53 PM Gaël Renoux  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi and Victor,
>
> I just opened this ticket on JIRA:
> https://issues.apache.org/jira/browse/FLINK-13586
> 
> (I hadn't seen these e-mails). Backward compatibility is broken between
> 1.8.0 and 1.8.1 if you use Kafka connectors.
>
> Can you upgrade your flink-connector-kafka dependency to 1.8.1 ? It won't
> deploy on a 1.8.0 server any more, if that's a concern for you.
>
> Gaël
>
> On Mon, Aug 5, 2019 at 4:37 PM Wong Victor 
> wrote:
>
>> Hi Avi:
>>
>>
>>
>> It seems you are submitting your job with an older Flink version (< 1.8),
>> please check your flink-dist version.
>>
>>
>>
>> Regards,
>>
>> Victor
>>
>>
>>
>> *From: *Avi Levi 
>> *Date: *Monday, August 5, 2019 at 9:11 PM
>> *To: *user 
>> *Subject: *getting an exception
>>
>>
>>
>> Hi,
>>
>> I'm using Flink 1.8.1. our code is mostly using Scala.
>>
>> When I try to submit my job (on my local machine ) it crashes with the
>> error below (BTW on the IDE it runs perfectly).
>>
>> Any assistance would be appreciated.
>>
>> Thanks
>>
>> Avi
>>
>> 2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Unhandled 
>> exception.
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> caused an error:
>>
>> at 
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
>>
>> at 
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>
>> at 
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>
>> at 
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>
>> at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.NoSuchMethodError: 
>> org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:494
>>  )
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:448
>>  )
>>
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(http://FlinkKafkaProducer011.java:383
>>  )
>>
>> at 
>> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)
>>
>> at 
>> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)
>>
>> at scala.Function0.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.Function0.apply$mcV$sp$(Function0.scala:34)
>>
>> at 
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>
>> at scala.App.$anonfun$main$1$adapted(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:388)
>>
>> at scala.App.main(App.scala:76)
>>
>> at scala.App.main$(App.scala:74)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob$.main(StreamingJob.scala:14)
>>
>> at 
>> com.bluevoyant.lookalike.analytic.StreamingJob.main(StreamingJob.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>
>> at 
>> org.apache.flin

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread 黄兆鹏
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.


In fact there are two ways to solve my problem,


the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream



the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^ 
  |   
BroadcastStream






The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.


The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.


Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 


Thanks!

 
 
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?

 

Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek 

> On 6 Aug 2019, at 09:56, 黄兆鹏  wrote:
> 
> Hi all, 
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
> 
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>   ^   ^  ^
>   ||   |
>BroadcastStream
> 
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> chained together in one slot because they have the same parallelism so that 
> it can gain maximum performance.
> 
> And I was wondering that if the broadcast stream exists, will it affect the 
> performance? Or flink will still chain them together to gain maximum 
> performance? 
> 
> Thanks!

Re: An ArrayIndexOutOfBoundsException after a few message with Flink 1.8.1

2019-08-06 Thread Nicolas Lalevée
Hi Yun,

Indeed, that was it: a parallelism set to lower than what my custom partitioner 
was computing.

Thanks

Nicolas

On Tue, Aug 6, 2019, at 4:47 AM, Yun Gao wrote:
> Hi Nicolas:
> 
>  Are you using a custom partitioner? If so, you might need to check if the 
> Partitioners#partition has returned a value that is greater than or equal to 
> the parallelism of the downstream tasks. The expected return value should be 
> in the interval [0, the parallelism of the downstream task).
> 
> Best,
> Yun
> 
>> --
>> From:Nicolas Lalevée 
>> Send Time:2019 Aug. 5 (Mon.) 22:58
>> To:user 
>> Subject:An ArrayIndexOutOfBoundsException after a few message with Flink 
>> 1.8.1
>> 
>> Hi,
>> 
>> I have got a weird error after a few messages. I have first seen this error 
>> on a deployed Flink cluster 1.7.1. Trying to figure it out, I am trying with 
>> a local Flink 1.8.1. I still get this ArrayIndexOutOfBoundsException. I 
>> don't have a precise scenario to reproduce it, but it is happening often.
>> Any idea what could be going wrong there ?
>> 
>> The full stack trace:
>> 
>> Exception in thread "main" java.lang.RuntimeException: 
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>  at 
>> com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:61)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
>> execution failed.
>>  at 
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>  at 
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>  at 
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>  at 
>> com.mycompany.myproject.controljob.ControlTopology.run(ControlTopology.java:137)
>>  at 
>> com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:53)
>> Caused by: java.lang.RuntimeException: Index 2 out of bounds for length 2
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>  at 
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(Strea

Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread Piotr Nowojski
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

Piotrek 

> On 6 Aug 2019, at 09:56, 黄兆鹏  wrote:
> 
> Hi all, 
> My flink job has dynamic schema of data, so I want to consume a schema kafka 
> topic and try to broadcast to every operator so that each operator could know 
> what kind of data it is handling.
> 
> For example, the two streams just like this:
> OperatorA  ->  OperatorB  -> OperatorC
>   ^   ^  ^
>   ||   |
>BroadcastStream
> 
> If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
> chained together in one slot because they have the same parallelism so that 
> it can gain maximum performance.
> 
> And I was wondering that if the broadcast stream exists, will it affect the 
> performance? Or flink will still chain them together to gain maximum 
> performance? 
> 
> Thanks!



Flink 1.8.1: Seeing {"errors":["Not found."]} when trying to access the Jobmanagers web interface

2019-08-06 Thread Kaymak, Tobias
Hello,

after upgrading the docker image from version 1.7.2  to 1.8.1 and wiping
out zookeeper completely I see

{"errors":["Not found."]}

when trying to access the webinterface of Flink. I can launch jobs from the
cmdline and I can't spot any error in the logs (so far on level INFO). I
tried adding the flink-runtime-web_2.12-1.8.1.jar as a dependency into the
lib folder when building the Docker container, but this did not help either.

Has anyone experienced this problem? Is my Flink config faulty or what
could be the reason?


Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread 黄兆鹏
Hi all, 
My flink job has dynamic schema of data, so I want to consume a schema kafka 
topic and try to broadcast to every operator so that each operator could know 
what kind of data it is handling.


For example, the two streams just like this:
OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  ||   |
   BroadcastStream


If the broadcast stream does not exist, OperatorA, OperatorB, OperatorC are 
chained together in one slot because they have the same parallelism so that it 
can gain maximum performance.


And I was wondering that if the broadcast stream exists, will it affect the 
performance? Or flink will still chain them together to gain maximum 
performance? 


Thanks!

Re:Re: How to write value only using flink's SequenceFileWriter?

2019-08-06 Thread Haibo Sun
Hi Liu Bo,


If you haven't customize serializations through the configuration item 
"io.serializations", the default serializer for Writable objects is 
org.apache.hadoop.io.serializer.WritableSerialization.WritableSerializer. As 
you said, when WritableSerializer serialize the NullWritable object, it doesn't 
actually write anything. So I suspect that "(null)" you saw may be part of the 
value, not the key.




Best,
Haibo

At 2019-07-27 11:43:47, "Liu Bo"  wrote:

The file header says key is NullWritable: 

SEQ^F!org.apache.hadoop.io.NullWritable^Yorg.apache.hadoop.io.Text^A^A)org.apache.hadoop.io.compress.SnappyCodec


Might be a hadoop -text problem?


On Sat, 27 Jul 2019 at 11:07, Liu Bo  wrote:

Dear flink users, 


We're trying to switch from StringWriter to SequenceFileWriter to turn on 
compression. StringWriter writes value only and we want to keep that way.


AFAIK, you can use NullWritable in Hadoop writers to escape key so you only 
write the values. 


So I tried with NullWritable as following code:


   BucketingSink> hdfsSink = new 
BucketingSink("/data/cjv");
  hdfsSink.setBucketer(new DateTimeBucketer<>("-MM-dd/HH", ZoneOffset.UTC));
  hdfsSink.setWriter(new SequenceFileWriter("org.apache.hadoop.io.compress.SnappyCodec", 
SequenceFile.CompressionType.BLOCK));
  hdfsSink.setBatchSize(1024 * 1024 * 250);
  hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);



   joinedResults.map(new MapFunction, 
Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
return Tuple2.of(NullWritable.get(), new Text(value.f1));
}
}).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");


But out put file has key as string value (null)
eg:
(null)  {"ts":1564168038,"os":"android",...}


So my question is how to escape the key completely and write value only in 
SequenceFileWriter?

Your help will be much of my appreciation.


--

All the best

Liu Bo




--

All the best

Liu Bo