Re: Use different S3 access key for different S3 bucket

2024-01-18 Thread Jun Qin
Hi Qing

The S3 credentials are associated with Flink SQL tables.

I assume you are talking about processing/joining from two different tables, 
backed up by two different S3 buckets. If so, you can provide different 
credentials for different tables, then use the two tables in your pipeline.

Jun
On 18. Jan 2024 at 11:32 +0100, Qing Lim , wrote:
> Hi, I am using Flink SQL to create table backed by S3 buckets.
>
> We are not using AWS S3, so we have to use access key and secret for Auth.
>
> My pipeline depends on 2 different buckets, each requires different 
> credentials, can flink support this?
>
>
>
> Qing Lim | Marshall Wace LLP, George House, 131 Sloane Street, London, SW1X 
> 9AT | E-mail: q@mwam.com | Tel: +44 207 925 4865
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. If you 
> are not the intended recipient of this e-mail you are hereby notified that 
> any dissemination, distribution, or copying of its content is strictly 
> prohibited. If you have received this message in error, please notify the 
> sender by return e-mail and destroy the message and all copies in your 
> possession.
>
> To find out more details about how we may collect, use and share your 
> personal information, please see https://www.mwam.com/privacy-policy. This 
> includes details of how calls you make to us may be recorded in order for us 
> to comply with our legal and regulatory obligations.
>
> To the extent that the contents of this email constitutes a financial 
> promotion, please note that it is issued only to and/or directed only at 
> persons who are professional clients or eligible counterparties as defined in 
> the FCA Rules. Any investment products or services described in this email 
> are available only to professional clients and eligible counterparties. 
> Persons who are not professional clients or eligible counterparties should 
> not rely or act on the contents of this email.
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P. ("MWNA"), which is registered with 
> the US Securities and Exchange Commission ("SEC") as an investment adviser. 
> Registration with the SEC does not imply that MWNA or its employees possess a 
> certain level of skill or training.


Re: Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-16 Thread Jun Qin
Hi Ken,

> Broadcast state is weird in that it’s duplicated, apparently avoid “hot 
> spots” when restoring from state. So I’m wondering how Flink handles the case 
> of restoring broadcast state when the parallelism increases.

The Flink doc is here: 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/broadcast_state/.
 Particularly,
> Upon scaling up, each task reads its own state, and the remaining tasks 
> (p_new-p_old) read checkpoints of previous tasks in a round-robin manner.

You could also consider to enable DEBUG logs (for necessary classes) when you 
give another try to see what happens in TMs. I also suggest to check all of 
your state storage metrics for any possible indication.

Thanks
Jun

> On Dec 16, 2022, at 4:48 PM, Ken Krugler  wrote:
> 
> Hi Jun,
> 
> Thanks for following up.
> 
> The state storage is internal at a client, and isn’t throttled.
> 
> Also restoring from the savepoint when we didn’t change the parallelism was 
> fine.
> 
> I didn’t see any errors in the TM logs, but I didn’t carefully inspect them - 
> I’ll do that when we give this another test.
> 
> Broadcast state is weird in that it’s duplicated, apparently avoid “hot 
> spots” when restoring from state. So I’m wondering how Flink handles the case 
> of restoring broadcast state when the parallelism increases.
> 
> Regards,
> 
> — Ken
>  
> 
>> On Dec 15, 2022, at 4:33 PM, Jun Qin > <mailto:qinjunje...@gmail.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Without knowning the details, the first thing I would suggest to check is 
>> whether you have reached a threshold which is configured in your state 
>> storage (e.g., s3) therefore your further download were throttled. Checking 
>> your storage metrics or logs should help to confirm whether this is the case.
>> 
>> In addition, in those TMs where the restarting was slow, do you see anything 
>> suspicious in the logs, e.g., reconnecting?
>> 
>> Thanks
>> Jun
>> 
>> 
>> 
>> 
>> 发自我的手机
>> 
>> 
>>  原始邮件 
>> 发件人: Ken Krugler > <mailto:kkrugler_li...@transpac.com>>
>> 日期: 2022年12月14日周三 19:32
>> 收件人: User mailto:user@flink.apache.org>>
>> 主 题: Slow restart from savepoint with large broadcast state when
>> increasing parallelism
>> Hi all,
>> 
>> I have a job with a large amount of broadcast state (62MB).
>> 
>> I took a savepoint when my workflow was running with parallelism 300.
>> 
>> I then restarted the workflow with parallelism 400.
>> 
>> The first 297 sub-tasks restored their broadcast state fairly quickly, but 
>> after that it slowed to a crawl (maybe 2 sub-tasks finished per minute)
>> 
>> After 10 minutes we killed the job, so I don’t know if it would have 
>> ultimately succeeded.
>> 
>> Is this expected? Seems like it could lead to a bad situation, where it 
>> would take an hour to restart the workflow.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
> 
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
> 
> 
> 



回复:Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-15 Thread Jun Qin
Hi Ken,Without knowning the details, the first thing I would suggest to check is whether you have reached a threshold which is configured in your state storage (e.g., s3) therefore your further download were throttled. Checking your storage metrics or logs should help to confirm whether this is the case.In addition, in those TMs where the restarting was slow, do you see anything suspicious in the logs, e.g., reconnecting?ThanksJun发自我的手机 原始邮件 发件人: Ken Krugler 日期: 2022年12月14日周三 19:32收件人: User 主题: Slow restart from savepoint with large broadcast state when increasing parallelismHi all,I have a job with a large amount of broadcast state (62MB).I took a savepoint when my workflow was running with parallelism 300.I then restarted the workflow with parallelism 400.The first 297 sub-tasks restored their broadcast state fairly quickly, but after that it slowed to a crawl (maybe 2 sub-tasks finished per minute)After 10 minutes we killed the job, so I don’t know if it would have ultimately succeeded.Is this expected? Seems like it could lead to a bad situation, where it would take an hour to restart the workflow.Thanks,— Ken
--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch

Re: videos Flink Forward San Francisco 2022

2022-10-11 Thread Jun Qin
Hi 

Totally agree, rest assured that it was some venue limitations and some 
post-pandemic organizational challenges that meant no videos this year. Thanks 
a lot for the feedback and please let's stay positive and not draw the wrong 
conclusions.

Thanks
Jun

> On Oct 10, 2022, at 2:39 PM, guenterh.lists  wrote:
> 
> really very sad - as far as I know this happens for the first time, attitude 
> of new Ververica?
> 
> Hopefully immerok may resume the open mentality of data artisans.
> 
> Günter 
> 
> On 10.10.22 11:26, Martijn Visser wrote:
>> Hi Günter,
>> 
>> I've understood that only the keynotes were recorded and not the other 
>> sessions.
>> 
>> Best regards,
>> 
>> Martijn
>> 
>> On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists > > wrote:
>> Sorry if this question was already posted
>> 
>> By now only a few videos of the conference were published (mainly the 
>> keynotes)
>> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB 
>> 
>> 
>> Are the other presentations not going to be published?
>> 
>> Günter
>> 



Re: Fail to build Flink 1.15.1

2022-09-10 Thread Jun Qin
Thanks Chesnay, 

I wanted to check out the tagged release 1.15.1. I did it in this way:
git clone --depth 1 --branch release-1.15.1 g...@github.com:apache/flink.git

This seems cause the problem. With the same java/maven, I can build the branch 
1.15.  


> On Sep 9, 2022, at 11:58 PM, Chesnay Schepler  wrote:
> 
> hmm...we've only seen that error in older Flink version: 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/ide_setup/#compilation-fails-with-cannot-find-symbol-symbol-method-defineclass-location-class-sunmiscunsafe
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/ide_setup/#compilation-fails-with-cannot-find-symbol-symbol-method-defineclass-location-class-sunmiscunsafe>
> 
> Please double-check whether you actually checked out 1.15.1; I can't 
> reference to sun.misc.Unsafe in the 1.15.1 version of the mentioned class: 
> https://github.com/apache/flink/blob/release-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
>  
> <https://github.com/apache/flink/blob/release-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java>
> 
> On 09/09/2022 22:01, Jun Qin wrote:
>> I have an issue when build a clean Flink 1.15.1 on MacOS with: 
>> mvn clean install -DskipTests -Dfast
>> 
>> % echo $JAVA_HOME
>>  
>> 
>> /usr/local/Cellar/openjdk@11/11.0.16.1
>> 
>> % mvn -version   
>>  
>>
>> Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
>> Maven home: /usr/local/Cellar/maven/3.8.6/libexec
>> Java version: 11.0.16.1, vendor: Homebrew, runtime: 
>> /usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home
>> Default locale: en_US, platform encoding: UTF-8
>> OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: “mac"
>> 
>> % java -version  
>>  
>>  
>> openjdk version "11.0.16.1" 2022-08-12
>> OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
>> OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1+0, mixed mode)
>> 
>> It failed with:
>> [ERROR] Failed to execute goal 
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
>> on project flink-test-utils-junit: Compilation failure
>> [ERROR] 
>> /Workspace/flink-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[244,53]
>>  cannot find symbol
>> [ERROR]   symbol:   method 
>> defineClass(java.lang.String,byte[],int,int,java.lang.ClassLoader,java.security.ProtectionDomain)
>> [ERROR]   location: class sun.misc.Unsafe
>> 
>> I tried also with a downloaded maven 3.2.5 binary (maven 3.2 has been 
>> disabled in brew):
>> % ~/Downloads/apache-maven-3.2.5/bin/mvn -version
>>  
>> 
>> Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 
>> 2014-12-14T18:29:23+01:00)
>> Maven home: /Users/jqin/Downloads/apache-maven-3.2.5
>> Java version: 11.0.16.1, vendor: Homebrew
>> Java home: 
>> /usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home
>> Default locale: en_US, platform encoding: UTF-8
>> OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: "mac"
>> 
>> it failed at the same place with the same error message.
>> 
>> Anything I did is wrong? 
>> 
>> Jun
> 



Fail to build Flink 1.15.1

2022-09-09 Thread Jun Qin
I have an issue when build a clean Flink 1.15.1 on MacOS with: 
mvn clean install -DskipTests -Dfast

% echo $JAVA_HOME   

 
/usr/local/Cellar/openjdk@11/11.0.16.1

% mvn -version  

  
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /usr/local/Cellar/maven/3.8.6/libexec
Java version: 11.0.16.1, vendor: Homebrew, runtime: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: “mac"

% java -version 

  
openjdk version "11.0.16.1" 2022-08-12
OpenJDK Runtime Environment Homebrew (build 11.0.16.1+0)
OpenJDK 64-Bit Server VM Homebrew (build 11.0.16.1+0, mixed mode)

It failed with:
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-test-utils-junit: Compilation failure
[ERROR] 
/Workspace/flink-1.15.1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[244,53]
 cannot find symbol
[ERROR]   symbol:   method 
defineClass(java.lang.String,byte[],int,int,java.lang.ClassLoader,java.security.ProtectionDomain)
[ERROR]   location: class sun.misc.Unsafe

I tried also with a downloaded maven 3.2.5 binary (maven 3.2 has been disabled 
in brew):
% ~/Downloads/apache-maven-3.2.5/bin/mvn -version   

 
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 
2014-12-14T18:29:23+01:00)
Maven home: /Users/jqin/Downloads/apache-maven-3.2.5
Java version: 11.0.16.1, vendor: Homebrew
Java home: 
/usr/local/Cellar/openjdk@11/11.0.16.1/libexec/openjdk.jdk/Contents/Home
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "12.5.1", arch: "x86_64", family: "mac"

it failed at the same place with the same error message.

Anything I did is wrong? 

Jun

Re: Issues with watermark alignment in Flink 1.15

2022-07-14 Thread Jun Qin
Found the reason: it does not work together with 
.withIdleness(Duration.ofSeconds(1))

Isn't this a valid scenario: one subtask has multiple input streams/channels 
where some are idle, others have large watermark skews?

In addition, do we expect that the watermark update interval in:
.withWatermarkAlignment("wm-group", maxDrift, updateInterval)
to be at the milli second level? If so, the following log msg should be at the 
DEBUG level, I think
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]

to avoid streaming logs filling up the disk space.

Thanks
Jun


> On Jul 10, 2022, at 9:10 AM, Jun Qin  wrote:
> 
> Hi All
> 
> I am trying watermark alignment in Flink 1.15 with:
> 
> watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
> Duration.ofMillis(outOfOrderness))
> .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
> Duration.ofSeconds(1))
> .withTimestampAssigner(
> (element, timestamp) -> element.getTimestamp())
> .withIdleness(Duration.ofSeconds(1));
> 
> And got the following in DEBUG logs:
> 2022-07-10 06:53:35,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
> 2022-07-10 06:53:36,606 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from 
> subTaskId=2
> 2022-07-10 06:53:36,619 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from 
> subTaskId=1
> 2022-07-10 06:53:36,639 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from 
> subTaskId=3
> 2022-07-10 06:53:36,702 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from 
> subTaskId=0
> 2022-07-10 06:53:36,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
> 2022-07-10 06:53:37,229 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
> lock acquire time to keep lease
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> TryAcquireOrRenew return success
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> Successfully renewed lease
> 2022-07-10 06:53:37,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:37,605 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:37,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:37,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:37,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 
> 3]
> 2022-07-10 06:53:38,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:38,604 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:38,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:38,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:38,713 INFO  
> org.

Issues with watermark alignment in Flink 1.15

2022-07-10 Thread Jun Qin
Hi All

I am trying watermark alignment in Flink 1.15 with:

watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
Duration.ofMillis(outOfOrderness))
.withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
Duration.ofSeconds(1))
.withTimestampAssigner(
(element, timestamp) -> element.getTimestamp())
.withIdleness(Duration.ofSeconds(1));

And got the following in DEBUG logs:
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
2022-07-10 06:53:36,606 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from subTaskId=2
2022-07-10 06:53:36,619 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from subTaskId=1
2022-07-10 06:53:36,639 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from subTaskId=3
2022-07-10 06:53:36,702 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from subTaskId=0
2022-07-10 06:53:36,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:37,229 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
lock acquire time to keep lease
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
TryAcquireOrRenew return success
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
Successfully renewed lease
2022-07-10 06:53:37,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:37,605 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:37,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:37,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:37,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:38,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:38,604 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:38,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:38,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:38,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]


Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The 
watermark looks to be correct at beginning, then changed to a something related 
Long.MAX_VALUE… Feels like a buffer overflow issue..

As long as I remove the call .withWatermarkAlignment(), then all worked fine.

Any idea? 

Thanks
Jun




Status of File Sink Common (flink-file-sink-common)

2022-05-30 Thread Jun Qin
Hi,  

Has File Sink Common (flink-file-sink-common) been dropped? If so, since which 
version? I do not seem to find anything related in the release notes of 1.13.x, 
1.14.x and 1.15.0.

Thanks
Jun

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Jun Qin
Hi Dongwon

Did you override AsyncFunction#timeout()?  If so, did you call 
resultFuture.complete()/completeExceptionally() in your override?  Not calling 
them can result in checkpoint timeout.

Thanks
Jun


> On Nov 9, 2021, at 7:37 AM, Dongwon Kim  wrote:
> 
> Hi David,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). 
> Thanks for the input but scraping DEBUG messages into, for example, 
> ElasticSearch for monitoring on Grafana is not possible in my current 
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent requests 
> and # finished/failed requests, respectively, and used the two counters to 
> calculate the inflight requests from Prometheus.
> 
> As far as I can tell from looking at the code, the async operator is able to 
> checkpoint even if the work-queue is exhausted.
> Oh, I didn't know that! As you pointed out and I'm going to explain below, 
> the async operator might not be the source of the problem.
> 
> I just hit the same situation and found that 
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when 
> the backpressure is getting high (all the others don't do):
> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread   
>  [] - Uncaught exception in thread 'kafka-producer-network-thread 
> | producer-8':
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) 
> ~[?:?]
> at java.nio.channels.SocketChannel.write(SocketChannel.java:507) 
> ~[?:?]
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> 
> Can it be the reason why my pipeline is stalled and ends up with the checkout 
> timeout? I guess all the upstream tasks might fail to send data to the failed 
> kafka producer and records are stacking up in buffers, which could result in 
> the back-pressure. If so, is there no mechanism in Flink to detect such an 
> error and send it to the job manager for debugging purposes?
> 
> Best,
> 
> Dongwon
> 
> 
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  > wrote:
> Hi Dongwon,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). As far as I 
> can tell from looking at the code, the async operator is able to checkpoint 
> even if the work-queue is exhausted.
> 
> Arvid can you please validate the above? (the checkpoints not being blocked 
> by the work queue part)
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>  
> 
> 
> Best,
> D.
> 
> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim  > wrote:
> Hi community,
> 
> While using Flink's async i/o for interacting with an external system, I got