Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
BTW, the FLIP-75 is going for the user experience of web UI.
@Yadong Xiehave we already considered this issue to unify the ids in different 
parts in FLIP-75? 

Best,
Zhijiang
--
From:Zhijiang 
Send Time:2020 Feb. 14 (Fri.) 13:03
To:Jiayi Liao 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re: Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks







Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks






Re:Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Jiayi Liao



Hi Zhijiang,




It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 

@杨东晓 Do you have time to work on this?




[1]. https://issues.apache.org/jira/browse/FLINK-16051




Best Regards,

Jiayi Liao








At 2020-02-14 10:14:27, "Zhijiang"  wrote:

If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.


Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi


Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.


Best
Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
 
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint.
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks



Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks



Re: TableFactory support for WriteMode.OVERWRITE

2020-02-13 Thread Jingsong Li
Hi Flavio,

Csv has many problems...
- Old csv is not standard csv, we need use new csv..
- FileSystem in SQL, insert into need be append, insert overwrite need be
overwrite.
- FileSystem with csv need support partition.

Hope these things can be resolved in 1.11.

Best,
Jingsong Lee

On Fri, Feb 14, 2020 at 10:44 AM Jark Wu  wrote:

> Hi Flavio,
>
> That's true. The OVERWRITE mode s not exposed to csv table sink. There is
> already a JIRA issue discussed this FLINK-15066 [1].
> I think we may support INSERT OVERWRITE for csv sink in 1.11 instead of
> passing a connector property to the sink factory.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-15066
>
> On Thu, 13 Feb 2020 at 21:56, Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I was trying to use the new Table API to write into a CSV sink but I
>> discovered that I can't set the write mode to WriteMode.OVERWRITE. Is this
>> a missing feature or is it specifically designed? Is there any good reason
>> to not allow overwrite of file system connectors?
>>
>> Best,
>> Flavio
>>
>

-- 
Best, Jingsong Lee


Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Yun Tang
Hi

Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

Best
Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id

Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint.
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks


Re: TableFactory support for WriteMode.OVERWRITE

2020-02-13 Thread Jark Wu
Hi Flavio,

That's true. The OVERWRITE mode s not exposed to csv table sink. There is
already a JIRA issue discussed this FLINK-15066 [1].
I think we may support INSERT OVERWRITE for csv sink in 1.11 instead of
passing a connector property to the sink factory.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15066

On Thu, 13 Feb 2020 at 21:56, Flavio Pompermaier 
wrote:

> Hi to all,
> I was trying to use the new Table API to write into a CSV sink but I
> discovered that I can't set the write mode to WriteMode.OVERWRITE. Is this
> a missing feature or is it specifically designed? Is there any good reason
> to not allow overwrite of file system connectors?
>
> Best,
> Flavio
>


The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread 杨东晓
Hi, I'm trying to figure out the different end2end duration for each
subtask id in checkpoint.
In flink web ui I noticed  for job task subtask id it start from 0 and for
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks


Re:Flink 1.10 es sink exception

2020-02-13 Thread sunfulin
Anyone can share a little advice on the reason of this exception? I changed to 
use old planner, the same sql runs well. 











At 2020-02-13 16:07:18, "sunfulin"  wrote:

Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min






 

Re: 1.9 timestamp type default

2020-02-13 Thread Rui Li
Hi,

I don't think there's a config to change the default behavior. But you can
change the bridged class programmatically like:
DataTypes.TIMESTAMP(9).bridgedTo(java.sql.Timestamp.class)

On Fri, Feb 14, 2020 at 8:47 AM Fanbin Bu  wrote:

> Hi,
>
> According to
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/types.html#timestamp
> ,
> the default java bridge time for timestamp is java.time.LocalDateTime. Is
> there a setting that can change it to use
> java.sql.Timestamp instead?
>
> Thanks,
> Fanbin
>


-- 
Best regards!
Rui Li


1.9 timestamp type default

2020-02-13 Thread Fanbin Bu
Hi,

According to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/types.html#timestamp
,
the default java bridge time for timestamp is java.time.LocalDateTime. Is
there a setting that can change it to use
java.sql.Timestamp instead?

Thanks,
Fanbin


Re: Size of state for any known production use case

2020-02-13 Thread RKandoji
hey Marta,

Thank you!
This is going to be pretty useful, let me go over these and get back to you
if I have any questions.

RK



On Thu, Feb 13, 2020 at 6:21 AM Marta Paes Moreira 
wrote:

> Hi, Reva.
>
> If you are looking for the maximum known state size, I believe Alibaba is
> using Flink at the largest scale in production [1].
>
> There are also other examples of variable scale scattered across Flink
> Forward talks [2]. In particular, this Netflix talk [3] should be
> interesting to you.
>
> Marta
>
> [1]
> https://www.itnextsummit.com/wp-content/uploads/2019/11/Stephan_Ewen_Stream_Processing_Beyond_Streaming.pdf
>  (Slide
> 3)
> [2] https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/videos
> [3] https://www.youtube.com/watch?v=2C44mUPlx5o
>
> On Wed, Feb 12, 2020 at 10:42 PM RKandoji  wrote:
>
>> Hi Team,
>>
>> I've done a POC using Flink and planning to give a presentation about my
>> learnings and share the benefits of using Flink.
>>
>> I understand that companies are using Flink to handle Tera Bytes of
>> state, but it would be great if you could point me to any reference of a
>> company using Flink production for a known amount of state. Or any other
>> related links where I can get these details?
>>
>> Basically I want to provide the known maximum limit of state that can be
>> stored. This is needed because my use case requires performing stream joins
>> on unbounded data (although data is unbounded, its not going to be super
>> huge like 10TB)
>>
>>
>> Thanks,
>> Reva
>>
>


group by optimizations with sorted input

2020-02-13 Thread Richard Moorhead
In batch mode, if input is sorted prior to a group by operation; does flink
forward the aggregate data early? Is there a way to prevent grouping
operations from buffering all data in a GBK operation in batch mode?


Test sink behaviour

2020-02-13 Thread David Magalhães
Hi, I've created a CustomSink that writes parquet file to S3. Inside the
`invoke` method I have a loop to check if S3 is down, and if it is it will
wait exponentially until it is online again.

Now I want to write a test for this, and I can execute everything and see
that the Sink is doing what is suppose to do, but I can't have a way to
validate that is doing that programmatically (in a integration test).

One of the possibilities I was thinking was check the LazyLogger errors, to
verify that something was printed, but I can't mock Logger, since it is
final. Since I expose the number of errors as a counter, I was trying to
find a way to access it directly with Scala, but the only way I could find
was via Rest API, and that is kind of a hack.

Exemple:

- Get the Rest API port
with flinkCluster.getClusterClient.getFlinkConfiguration.getInteger("rest.port",
0)
- Get the jobId via http://localhost:61869/jobs/
- Get the verticeId via
http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873
- Get the metric via
http://localhost:61869/jobs/c3879cca4ba23ad734b2810ba0d73873/vertices/0a448493b4782967b150582570326227/metrics/?get=0.Sink__Unnamed.errors_sink

Should be available a better way to get the metric or test this ?

Thanks


Re: CEP with changing threshold

2020-02-13 Thread Kostas Kloudas
Hi Hemant,

Why not using simple connected streams, one containing the
measurements, and the other being the control stream with the
thresholds which are updated from time to time.
Both will be keyed by the device class, to make sure that the
measurements and the thresholds for a specific device class will go to
the same machines.

The "current" thresholds you keep them in state as they come from the
control stream and the measurements you also keep them in a mapState
keyed by their timestamp.

When an element comes from the measurements side, your
KeyedCoProcessFunction fetches the thresholds from the "control state"
and goes to the elements state and fetches all the elements for N
units of time in the past and does the computation and purges
measurements that are too old to be useful (so that your state does
not grow indefinitely).

This solution does not use CEP but it gives you the freedom to do any
optimisations related to your usecase.

I hope this helps,
Kostas

On Wed, Feb 12, 2020 at 10:40 AM hemant singh  wrote:
>
> Hello Flink Users,
>
> I have a requirement to generate alerts for metrics like for example - if cpu 
> utilization spike i.e cpu_utilization > threshold (>90%) n number of time in 
> x minutes then generate alerts. For this I am using the CEP module. However, 
> one of the requirements is for different devices the threshold can be 
> different as ell as x and n in above statement. Moreover, for different 
> device class this will be different, also this can change in future.
> I am thinking of using Broadcast State Pattern and enrich the metrics stream 
> with this thresholds & rule and use it later in CEP pattern. One issue is how 
> to make sure that if new threshold values come in how the broadcast stream 
> will change. I have an understanding that if I can introduce a watermark in 
> broadcast stream when values change the KeyedBroadcastProcessFunction will 
> have latest values streamed.
> Is my understanding correct and if anyone has implemented something like this 
> can weigh in if this is right way to do it.
>
> Thanks,
> Hemant
>


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Ok many thanks again!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
You must switch to the Operator API to access the checkpointing lock. It was 
like by design - Operator API is not stable (@PublicEvolving) - that’s why we 
were able to deprecate and remove `checkpointingLock` in Flink 1.10/1.11.

Piotrek

> On 13 Feb 2020, at 14:54, Salva Alcántara  wrote:
> 
> BTW, is it possible to get the checkpoint lock within the processElement
> method of a ProcessFunction or this is not possible and I must switch to the
> Operator API instead?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: AW: How Flink Kafka Consumer works when it restarts

2020-02-13 Thread Timothy Victor
What are the pros and cons of Kafka offset keeping vs Flink offset
keeping?  Is one more reliable than the other?   Personally I prefer having
flink manage it due to it being intrinsically tied to its checkpointing
mechanism.  But interested to learn from others experiences.

Thanks

Tim

On Thu, Feb 13, 2020, 12:39 AM Hegde, Mahendra 
wrote:

> Thanks Theo !
>
>
>
> *From: *"theo.diefent...@scoop-software.de" <
> theo.diefent...@scoop-software.de>
> *Date: *Thursday, 13 February 2020 at 12:13 AM
> *To: *"Hegde, Mahendra" , "user@flink.apache.org"
> 
> *Subject: *[External] AW: How Flink Kafka Consumer works when it restarts
>
>
>
> Hi Mahendra,
>
>
>
> Flink will regularly create checkpoints or manually triggered savepoints.
> This is data managed and stored by Flink and that data also contains the
> kafka offsets.
>
>
>
> When restarting, you can configure to restart from the last checkpoint and
> or savepoint.
>
>
>
> You can additionally configure Flink to commit the offsets to kafka,
> again, on checkpoint only. You can then configure Flink to restart from the
> committed offset, if you don't let Flink restart from an existing
> checkpoint or savepoint, where it would first search in to retore the
> offsets.
>
>
>
> Having the offsets loaded either from checkpoint, savepoint or kafka, it
> will directly communicate with Kafka and ask kafka to poll messages
> starting from those offsets.
>
>
>
> Best regards
>
> Theo
>
>
>
>
> Von meinem Huawei-Telefon gesendet
>
>
>
>  Ursprüngliche Nachricht 
> Von: "Hegde, Mahendra" 
> Datum: Mi., 12. Feb. 2020, 17:50
> An: user@flink.apache.org
> Betreff: How Flink Kafka Consumer works when it restarts
>
> Hi All,
>
>
>
> I am bit confused on Flink kafka consumer working.
>
> I read that Flink stores the kafka message offset in checkpoint and uses
> it in case if it restarts.
>
>
>
> Question is when exactly Flink is committing about successful consumption
> confirmation to kafka broker?
>
> And when Flink job restarts will it send last offset which is available in
> checkpoint to kafka broker to start consuming from that point ?
>
> Or Kafka broker will resume based on last committed offset information
> available?
>
> (I mean who manages the actual offset here, Kafka broker or the Flink
> client)
>
>
>
> Thanks
>
> Mahendra
>


TableFactory support for WriteMode.OVERWRITE

2020-02-13 Thread Flavio Pompermaier
Hi to all,
I was trying to use the new Table API to write into a CSV sink but I
discovered that I can't set the write mode to WriteMode.OVERWRITE. Is this
a missing feature or is it specifically designed? Is there any good reason
to not allow overwrite of file system connectors?

Best,
Flavio


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
BTW, is it possible to get the checkpoint lock within the processElement
method of a ProcessFunction or this is not possible and I must switch to the
Operator API instead?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-13 Thread Oytun Tez
😍

On Thu, Feb 13, 2020 at 2:26 AM godfrey he  wrote:

> Congrats to everyone involved! Thanks, Yu & Gary.
>
> Best,
> godfrey
>
> Yu Li  于2020年2月13日周四 下午12:57写道:
>
>> Hi Kristoff,
>>
>> Thanks for the question.
>>
>> About Java 11 support, please allow me to quote from our release note [1]:
>>
>> Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka
>> 0.8–0.11
>> have not been tested with Java 11 because the respective projects did not
>> provide
>> Java 11 support at the time of the Flink 1.10.0 release
>>
>> Which is the main reason for us to still make our docker image based on
>> JDK 8.
>>
>> Hope this answers your question.
>>
>> Best Regards,
>> Yu
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html
>>
>>
>> On Wed, 12 Feb 2020 at 23:43, KristoffSC 
>> wrote:
>>
>>> Hi all,
>>> I have a small question regarding 1.10
>>>
>>> Correct me if I'm wrong, but 1.10 should support Java 11 right?
>>>
>>> If so, then I noticed that docker images [1] referenced in [2] are still
>>> based on openjdk8 not Java 11.
>>>
>>> Whats up with that?
>>>
>>> P.S.
>>> Congrats on releasing 1.10 ;)
>>>
>>> [1]
>>>
>>> https://github.com/apache/flink/blob/release-1.10/flink-container/docker/Dockerfile
>>> [2]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>> --
 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Glad that we could help :) Yes, Arvid’s response is spot on.

Piotrek

> On 13 Feb 2020, at 14:17, Salva Alcántara  wrote:
> 
> Many thanks for your detailed response Piotr, it helped a lot!
> 
> BTW, I got similar comments from Arvid Heise here:
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
Many thanks for your detailed response Piotr, it helped a lot!

BTW, I got similar comments from Arvid Heise here:
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Encountered error while consuming partitions

2020-02-13 Thread Piotr Nowojski
Hi 刘建刚,

Could you explain how did you fix the problem for your case? Did you modify 
Flink code to use `IdleStateHandler`?

Piotrek

> On 13 Feb 2020, at 11:10, 刘建刚  wrote:
> 
> Thanks for all the help. Following the advice, I have fixed the problem.
> 
>> 2020年2月13日 下午6:05,Zhijiang > > 写道:
>> 
>> Thanks for reporting this issue and I also agree with the below analysis. 
>> Actually we encountered the same issue several years ago and solved it also 
>> via the netty idle handler.
>> 
>> Let's trace it via the ticket [1] as the following step.
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-16030 
>> 
>> 
>> Best,
>> Zhijiang
>> 
>> --
>> From:张光辉 mailto:beggingh...@gmail.com>>
>> Send Time:2020 Feb. 12 (Wed.) 22:19
>> To:Benchao Li mailto:libenc...@gmail.com>>
>> Cc:刘建刚 mailto:liujiangangp...@gmail.com>>; user 
>> mailto:user@flink.apache.org>>
>> Subject:Re: Encountered error while consuming partitions
>> 
>> Network can fail in many ways, sometimes pretty subtle (e.g. high ratio 
>> packet loss). 
>> 
>> The problem is that the long tcp connection between netty client and server 
>> is lost, then the server failed to send message to the client, and shut down 
>> the channel. The Netty Client  does not know that the connection has been 
>> disconnected, so it has been waiting. 
>> 
>> To detect long tcp connection alive on netty client and server, we should 
>> have two ways: tcp keepalives and heartbeat.
>> Tcp keepalives is 2 hours by default. When the error occurs, if you continue 
>> to wait for 2 hours, the netty client will trigger exception and enter 
>> failover recovery.
>> If you want to detect long tcp connection quickly, netty provides 
>> IdleStateHandler which it use ping-pang mechanism. If netty client send 
>> continuously n ping message and receive no one pang message, then trigger 
>> exception.
>>  
>> 
> 



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Piotr Nowojski
Hi,

As Kostas has pointed out, the operator's and udf’s APIs are not thread safe 
and Flink always is calling them from the same, single Task thread. This also 
includes checkpointing state. Also as Kostas pointed out, the easiest way would 
be to try use AsyncWaitOperator. If that’s not possible, you can implement your 
custom logic based on its code.

>  So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.

In order to achieve this: to emit from a different thread:

Pre Flink 1.10

In the past (before Flink 1.10, so including Flink 1.9), multi threaded 
operators were supposed to acquire so called “checkpointingLock”. You can hold 
a reference to the output collector, but before emitting something, you have to 
acquire `checkpointingLock`. Note that if you acquire it and don’t release for 
some period of time, whole Task will be blocked from making any progress.

Flink 1.10+

in Flink 1.10 `checkpointLock` was deprecated and will be removed in Flink 
1.11. It is replaced by registering asynchronous runnable callbacks “mails", 
that can be executed by the task thread. So if you:
a) want to emit results produced by a custom thread
b) modify the operator’s state as a result of some work done by a custom thread
In both cases, both things have to be done inside the “mail” action. So for 
example pattern for a) is:

1. External thread creates record R1 to emit
2. External thread creates a “mail” to emit record R1, and it enqueues it into 
the mailbox
3. Task's thread picks up the the mail, executes it’s code, and that codes is 
emitting the record R1 from the Task thread

For both of those patterns, please take a look at the AsyncWaitOperator code in 
the respective Flink versions. Just keep in mind, that if you implement it 
using `checkpointingLock`, this will not work anymore in Flink 1.11.

Piotrek

> On 13 Feb 2020, at 10:56, Salva Alcántara  wrote:
> 
> I still need to get into the AsyncWaitOperator, but after taking a look at
> the Async I/O API, it seems that the normal use case is when you expect a
> result for each element in the input stream, so you register a callback
> together with a timeout for each input element. This is not exactly what my
> use case requires. In particular, when I send an event to the third party
> library, I might get a result...or not. The library is used for detecting
> certain patterns, so it is not as when you are querying a database, where
> you expect a result within a given time frame for each input element. In my
> case, it is more the other way around, most of the time you will not be
> expecting any outcome (think of anomaly detection). What I need is a way to
> collect the result (if any) from my third party library in my
> ProcessFunction, knowing that these outcomes will be exceptional compared
> with the cardinality of the input stream. After giving some extra thoughts,
> I don't know if the Async I/O pattern really suits my needs...
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-13 Thread Hequn Cheng
Thanks a lot for the release, Jincheng!
Also thanks to everyone that make this release possible!

Best,
Hequn

On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:

> Thanks for the great work, Jincheng.
>
> Regards,
> Dian
>
> 在 2020年2月13日,下午1:32,jincheng sun  写道:
>
> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for the
> Apache Flink Python API 1.9 series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
>
> https://pypi.org/project/apache-flink/1.9.2/#files
>
> Or installed using pip command:
>
> pip install apache-flink==1.9.2
>
> We would like to thank all contributors of the Apache Flink community who
> helped to verify this release and made this release possible!
>
> Best,
> Jincheng
>
>
>


Re: Size of state for any known production use case

2020-02-13 Thread Marta Paes Moreira
Hi, Reva.

If you are looking for the maximum known state size, I believe Alibaba is
using Flink at the largest scale in production [1].

There are also other examples of variable scale scattered across Flink
Forward talks [2]. In particular, this Netflix talk [3] should be
interesting to you.

Marta

[1]
https://www.itnextsummit.com/wp-content/uploads/2019/11/Stephan_Ewen_Stream_Processing_Beyond_Streaming.pdf
(Slide
3)
[2] https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/videos
[3] https://www.youtube.com/watch?v=2C44mUPlx5o

On Wed, Feb 12, 2020 at 10:42 PM RKandoji  wrote:

> Hi Team,
>
> I've done a POC using Flink and planning to give a presentation about my
> learnings and share the benefits of using Flink.
>
> I understand that companies are using Flink to handle Tera Bytes of state,
> but it would be great if you could point me to any reference of a company
> using Flink production for a known amount of state. Or any other related
> links where I can get these details?
>
> Basically I want to provide the known maximum limit of state that can be
> stored. This is needed because my use case requires performing stream joins
> on unbounded data (although data is unbounded, its not going to be super
> huge like 10TB)
>
>
> Thanks,
> Reva
>


Re: SSL configuration - default behaviour

2020-02-13 Thread Piotr Nowojski
Hi Krzysztof,

Thanks for the suggestion. It was kind of implied in the first sentence on the 
page already, but I’m fixing it [1] to make it more clear. 

Piotrek

[1] https://github.com/apache/flink/pull/11083 


> On 11 Feb 2020, at 08:22, Krzysztof Chmielewski 
>  wrote:
> 
> Thanks Robert,
> just a small suggestion maybe to change the documentation a little bit.
> 
> I'm not sure if its only my impression but from sentence: 
> " All internal connections are SSL authenticated and encrypted" initially I 
> thought that this is the default configuration.
> 
> Thanks,
> Krzysztof
> 
> pon., 10 lut 2020 o 15:12 Robert Metzger  > napisał(a):
> Hi,
> 
> thanks a lot for your message. By default, internal connections are not 
> encrypted.
> 
> On Fri, Feb 7, 2020 at 4:08 PM KristoffSC  > wrote:
> Hi,
> In documentation [1] we can read that
> 
> All internal connections are SSL authenticated and encrypted. The
> connections use mutual authentication, meaning both server and client side
> of each connection need to present the certificate to each other. The
> certificate acts effectively as a shared secret.
> 
> But is this a default behavior? Are internal connections encrypted by
> default?
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 



Re: Encountered error while consuming partitions

2020-02-13 Thread 刘建刚
Thanks for all the help. Following the advice, I have fixed the problem.

> 2020年2月13日 下午6:05,Zhijiang  写道:
> 
> Thanks for reporting this issue and I also agree with the below analysis. 
> Actually we encountered the same issue several years ago and solved it also 
> via the netty idle handler.
> 
> Let's trace it via the ticket [1] as the following step.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-16030 
> 
> 
> Best,
> Zhijiang
> 
> --
> From:张光辉 
> Send Time:2020 Feb. 12 (Wed.) 22:19
> To:Benchao Li 
> Cc:刘建刚 ; user 
> Subject:Re: Encountered error while consuming partitions
> 
> Network can fail in many ways, sometimes pretty subtle (e.g. high ratio 
> packet loss). 
> 
> The problem is that the long tcp connection between netty client and server 
> is lost, then the server failed to send message to the client, and shut down 
> the channel. The Netty Client  does not know that the connection has been 
> disconnected, so it has been waiting. 
> 
> To detect long tcp connection alive on netty client and server, we should 
> have two ways: tcp keepalives and heartbeat.
> Tcp keepalives is 2 hours by default. When the error occurs, if you continue 
> to wait for 2 hours, the netty client will trigger exception and enter 
> failover recovery.
> If you want to detect long tcp connection quickly, netty provides 
> IdleStateHandler which it use ping-pang mechanism. If netty client send 
> continuously n ping message and receive no one pang message, then trigger 
> exception.
>  
> 



Re: Encountered error while consuming partitions

2020-02-13 Thread Zhijiang
Thanks for reporting this issue and I also agree with the below analysis. 
Actually we encountered the same issue several years ago and solved it also via 
the netty idle handler.

Let's trace it via the ticket [1] as the following step.

[1] https://issues.apache.org/jira/browse/FLINK-16030

Best,
Zhijiang


--
From:张光辉 
Send Time:2020 Feb. 12 (Wed.) 22:19
To:Benchao Li 
Cc:刘建刚 ; user 
Subject:Re: Encountered error while consuming partitions

Network can fail in many ways, sometimes pretty subtle (e.g. high ratio packet 
loss). 

The problem is that the long tcp connection between netty client and server is 
lost, then the server failed to send message to the client, and shut down the 
channel. The Netty Client  does not know that the connection has been 
disconnected, so it has been waiting. 

To detect long tcp connection alive on netty client and server, we should have 
two ways: tcp keepalives and heartbeat.
Tcp keepalives is 2 hours by default. When the error occurs, if you continue to 
wait for 2 hours, the netty client will trigger exception and enter failover 
recovery.
If you want to detect long tcp connection quickly, netty provides 
IdleStateHandler which it use ping-pang mechanism. If netty client send 
continuously n ping message and receive no one pang message, then trigger 
exception.





Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-13 Thread Salva Alcántara
I still need to get into the AsyncWaitOperator, but after taking a look at
the Async I/O API, it seems that the normal use case is when you expect a
result for each element in the input stream, so you register a callback
together with a timeout for each input element. This is not exactly what my
use case requires. In particular, when I send an event to the third party
library, I might get a result...or not. The library is used for detecting
certain patterns, so it is not as when you are querying a database, where
you expect a result within a given time frame for each input element. In my
case, it is more the other way around, most of the time you will not be
expecting any outcome (think of anomaly detection). What I need is a way to
collect the result (if any) from my third party library in my
ProcessFunction, knowing that these outcomes will be exceptional compared
with the cardinality of the input stream. After giving some extra thoughts,
I don't know if the Async I/O pattern really suits my needs...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dedup all data in stream

2020-02-13 Thread Kostas Kloudas
Hi Akshay,

Is your usecase that the input stream consists of metrics from these
1000s of resources, the ProcessFunction aggregates
them in windows of 2min and does some analysis on these metrics and
this analysis may take more than 2 min so you create backpressure to
the source?

If this case, if the metric records are timestamped, then you can use
event time and you will have your metrics in the correct timestamp
order
and eventually your stream will catch up (assuming that you have
enough resources - parallelism).
If you want this analysis of the incoming metrics to be performed by
another thread and while this is happening
ignore any other incoming records, then you should look towards the
direction AsyncIO that I posted previously.

This will guarantee that you will have fault tolerance and
asynchronous processing.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 6:33 PM Akshay Shinde  wrote:
>
> Hi Kostas
>
> We are doing scans on 1000s of resources which we want to do it at some 
> interval which is currently 2 mins. Scanning is the same operation we want to 
> perform at every 2 minutes to check if everything is ok or not. Sometimes 
> this scan operation takes lot of time which results in lag and in stream 
> (which we produce from source function) we are getting multiple sets of data 
> for same 1000s of resources. At this time we are okay if perform scan 
> operation only once for all the set that are present currently in stream.
>
> Parallelism for source function is 1 and for Process function its currently 2.
>
> Thanks for the response.
>
> —
> Akshay
>
> > On Feb 12, 2020, at 2:07 AM, Kostas Kloudas  wrote:
> >
> > Hi Akshay,
> >
> > Could you be more specific on what you are trying to achieve with this 
> > scheme?
> >
> > I am asking because if your source is too fast and you want it to slow
> > it down so that it produces data at the same rate as your process
> > function can consume them, then Flink's backpressure will eventually
> > do this.
> >
> > If you want your process function to discard incoming elements (and
> > not queue them) if it is in the process of processing another element,
> > then this implies a multithreaded process function and I would look
> > maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
> > setting a flag as busy while processing and as false when it is done
> > and ready to process the next element.
> >
> > Also, in order to help, I would need more information about the stream
> > being keyed or non-keyed and the parallelism of the source compared to
> > that of the process function.
> >
> > I hope this helps,
> > Kostas
> >
> > [1] 
> > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html__;!!GqivPVa7Brio!NE2BACN8q5265oyZUvBg44u3mq7sGn96u3rPtcVbFq9DovpIa1KtilsCXW3mtYofoQw$
> >
> > On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde  
> > wrote:
> >>
> >> Hi Community
> >>
> >> In our Flink job, in source we are creating our own stream to process n 
> >> number of objects per 2 minutes. And in process function for each object 
> >> from generated source stream we are doing some operation which we expect 
> >> to get finished in 2 minutes.
> >>
> >> Every 2 minutes we are generating same ’N’ objects in stream which process 
> >> function will process.  But in some cases process function is taking 
> >> longer time around 10 minutes. In this case stream will have 5 number of 
> >> sets for ’N’ objects as process function is waiting for 10 minutes as 
> >> source is adding ’N’ objects in stream at every 2 minutes. Problem is we 
> >> don’t want to process these objects 5 times, we want it to process only 
> >> once for the latest ’N’ objects.
> >>
> >> This lag can be more or less from process function which results in lag 
> >> from source to process function in job execution.
> >>
> >>
> >> Thanks in advance !!!
>


Re: Aggregation for last n seconds for each event

2020-02-13 Thread Kostas Kloudas
Hi Oleg,

With the approach with the MapState you can always fire on every
incoming element :)
You just iterate in the map state and find all the elements that have
timestamp (key) between the timestamp of the current element (NOW) and
and NOW-N.

Anyway, if Fanbin's solution works, then you can always use that!

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь  wrote:
>
> Hi Kostas,
>
> Thanks for your reply!
> Yes, you understand me correctly. However, I also want the stream to be keyed 
> to process it in parallel. I'm afraid the approach with MapState you 
> suggested doesn't really suite my use case because I need to fire on every 
> incoming event.
> Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT 
> ROW" looks 100% like what I need, but I haven't tried it yet.
> Also wondering if it might be expressed in DataStream API.
>
> ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas :
>>
>> Hi Oleg,
>>
>> Could you be more specific on what do you mean by
>> "for events of last n seconds(time units in general) for every incoming 
>> event."?
>>
>> Do you mean that you have a stream of parallelism 1 and you want for
>> each incoming element to have your function fire with input the event
>> itself and all the events that arrived within the last N time units?
>> If this is the case, you can use a dummy key to key your stream to
>> have access to keyed state, then use Map State with key being the
>> timestamp and value being a list of the already seen elements with
>> that timestamp and whenever an element arrives, you can register a
>> timer to fire N time units in the future. Then, when the timer fires,
>> you can iterate over the map, fetch the elements you are interested
>> in, and clean-up whatever you will not need anymore.
>>
>> For an example you could look at [1].
>>
>> I hope this helps,
>> Kostas
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu  wrote:
>> >
>> > can u do
>> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>> >
>> > On Tue, Feb 11, 2020 at 12:15 PM oleg  wrote:
>> >>
>> >> Hi Community,
>> >>
>> >> I do streaming in event time and I want to preserve ordering and late
>> >> events. I have a use case where I need to fire an aggregation function
>> >> for events of last n seconds(time units in general) for every incoming
>> >> event.
>> >>
>> >> It seems to me that windowing is not suitable since it may be expressed
>> >> either in time or in events count, not "last n seconds for each single
>> >> event".
>> >>
>> >> Is there an idiomatic way to do this? Any examples or help are
>> >> appreciated. Thanks in advance.
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Oleg Bonar
>> >>


Flink 1.10 es sink exception

2020-02-13 Thread sunfulin
Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min