Detecting when a job is "caught up"

2022-12-07 Thread Kurtis Walker via user
I’ve got a Flink job that uses a HybridSource.  It reads a bunch of S3 files 
and then transitions to a Kafka Topic where it stays processing live data.  
Everything gets written to a warehouse where users build and run reports.  In 
takes about 36 hours to read data from the beginning before it’s caught up to 
the live data.  I’m working on how to switch from V1 of my job to V2, without 
any down time for the user.  Ideally I can start V2, let it read all the data 
from the beginning again, write it to a V2 warehouse, and only when it’s caught 
up, stop the V1 job and notify my reporting engine to use V2 instead of V1.  Is 
there any way for Flink to trigger some function when V2 is caught up, so that 
my code could make the switch?  Even something that could be called when the 
Hybrid Source switches from S3 to Kafka would be close enough for my purposes.

Thanks,

Kurt


Multiple sources ordering

2021-10-21 Thread Kurtis Walker
I have a case where my Flink job needs to consume multiple sources.  I have a 
topic in Kafka where the order of consuming is important.  Because the cost of 
S3 is much less than storage on Kafka, we have a job that sinks to S3.  The 
topic in Kafka can then retain just 3 days worth of data.  My job needs to 
first consume everything from the existing S3 file(s) and only then start 
consuming from the Kafka topic.  When using a union operator in Flink, the data 
comes in mixed from both sources.  Is there any way that I can control the 
ordering so that it first reads S3, then Kafka all in the same job?

Kurt


Re: Using s3 bucket for high availability

2021-06-09 Thread Kurtis Walker
Thank you, I figured it out.  My IAM policy was missing some actions.  Seems I 
needed to give it “*” for it to work.

From: Tamir Sagi 
Date: Wednesday, June 9, 2021 at 6:02 AM
To: Yang Wang , Kurtis Walker 

Cc: user 
Subject: Re: Using s3 bucket for high availability
EXTERNAL EMAIL

I'd try several things

try accessing the bucket from CLI first locally
 
https://docs.aws.amazon.com/cli/latest/reference/s3/<https://urldefense.com/v3/__https:/docs.aws.amazon.com/cli/latest/reference/s3/__;!!P-5o2buULA!ikuTnxz7WokLw3i01LLcfzTCotpOR_X8C1JeQCzn6GXy27JaINvj73X0aDQYzFg0GDxn$>

If it does not work
please check your credentials under ~/.aws/credentials file + ~/.aws/config = 
since the AWS clients read the credentials from these files by default(unless 
other credentials are set)

If everything works well:

  1.  Are you running inside EKS? if so, you must attach the pods a service 
account with corresponded permissions to S3.
  2.  If not,  make sure the pods have the credentials to AWS(access key, 
secret key, region)
Please provide more code snippet.

I recently ran Flink job on Application cluster in EKS. the job also reads 
files from S3. (Without HA)

Tamir

[https://my-email-signature.link/signature.gif?u=1088647=157997735=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721]

From: Yang Wang 
Sent: Wednesday, June 9, 2021 11:29 AM
To: Kurtis Walker 
Cc: user 
Subject: Re: Using s3 bucket for high availability


EXTERNAL EMAIL

It seems to be a S3 issue. And I am not sure it is the root cause. Could you 
please share more details of the JobManager log?

Or could you verify that the Flink cluster could access the S3 bucket 
successfully(e.g. store the checkpoint) when HA is disabled?

Best,
Yang

Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 
于2021年6月8日周二 下午11:00写道:

Sorry, fat finger send before I finished writing….



Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:



kubernetes.service-account: flink-service-account

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery



I’m getting an error accessing the bucket.



2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.

2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.

com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input

at [Source: (String)""; line: 1, column: 0]

at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]



Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.





From: Kurtis Walker 
mailto:kurtis.wal...@sugarcrm.com>>
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user mailto:user@flink.apache.org>>
Subject: Using s3 bucket for high availability

Hello,

  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Using s3 bucket for high availability

2021-06-08 Thread Kurtis Walker
Sorry, fat finger send before I finished writing….

Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:

kubernetes.service-account: flink-service-account
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: 
s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery

I’m getting an error accessing the bucket.

2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client  
   [] - Bucket region cache doesn't have an entry for 
corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket 
region from Amazon S3.
2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson   
   [] - Failed to parse JSON string.
com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map 
due to end-of-input
at [Source: (String)""; line: 1, column: 0]
at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-s3-fs-presto-1.13.0.jar:1.13.0]

Is there an additional config I need for specifying the region for the bucket?  
I’ve been searching the doc and haven’t found anything like that.


From: Kurtis Walker 
Date: Tuesday, June 8, 2021 at 10:55 AM
To: user 
Subject: Using s3 bucket for high availability
Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:


Using s3 bucket for high availability

2021-06-08 Thread Kurtis Walker
Hello,
  I’m trying to set up my flink native Kubernetes cluster with High 
availability.  Here’s the relevant config:


Re: Backpressure configuration

2021-04-29 Thread Kurtis Walker
Thanks Roman.  I had already tried disableChaining, it didn’t have any effect.  
The built in JDBC sink is really slow compared to a bulk load(close to 100x), 
but I had tested that and saw the same issue.  When a given message triggers 
the JDBC sink to write a batch, everything else waits for it.

From: Roman Khachatryan 
Date: Thursday, April 29, 2021 at 2:49 PM
To: Kurtis Walker 
Cc: user@flink.apache.org 
Subject: Re: Backpressure configuration
EXTERNAL EMAIL

Hello Kurt,

Assuming that your sink is blocking, I would first make sure that it
is not chained with the preceding operators. Otherwise, the same
thread will output data and perform windowing/triggering.
You can add disableChaining after addSink to prevent this [1].

Besides that, you probably could use existing JDBC batching
functionality by configuring JdbcExecutionOptions [2] and providing it
to JdbcSink.sink() [3].

[1] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$>
[2] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$>
[3] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$>

Regards,
Roman

On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker
 wrote:
>
> Hello,
>
>   I’m building a POC for a Flink app that loads data from Kafka in to a 
> Greenplum data warehouse.  I’ve built a custom sink operation that will bulk 
> load a number of rows.  I’m using a global window, triggering on number of 
> records, to collect the rows for each sink.  I’m finding that while the sink 
> is running, the previous operations of my app stop processing messages until 
> the sink operation completes.  I guess this is the backpressure logic kicking 
> in.  The cost being I get about 60% of the throughput that is theoretically 
> possible.  Is there any configuration that would let me control that 
> backpressure so that Flink will buffer rows when it encounters backpressure?  
> In the ideal world when a sink operation completes, the next batch of rows is 
> ready for the sink to pick up immediately.  Here’s my code:
>
>
>
> env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new 
> SugarDeserializer(), localKafkaProperties))
>
> .keyBy((KeySelector) value -> 
> value.getPayload().getSource().getTable())
>
> .window(GlobalWindows.create())
>
> 
> .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), 
> Duration.of(1, ChronoUnit.MINUTES
>
> .aggregate(new ListAggregator())
>
> .addSink(new CopySink(new 
> SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");
>
>
>
>
>
> Thanks!
>
>
>
> Kurt
>
>


Backpressure configuration

2021-04-29 Thread Kurtis Walker
Hello,
  I’m building a POC for a Flink app that loads data from Kafka in to a 
Greenplum data warehouse.  I’ve built a custom sink operation that will bulk 
load a number of rows.  I’m using a global window, triggering on number of 
records, to collect the rows for each sink.  I’m finding that while the sink is 
running, the previous operations of my app stop processing messages until the 
sink operation completes.  I guess this is the backpressure logic kicking in.  
The cost being I get about 60% of the throughput that is theoretically 
possible.  Is there any configuration that would let me control that 
backpressure so that Flink will buffer rows when it encounters backpressure?  
In the ideal world when a sink operation completes, the next batch of rows is 
ready for the sink to pick up immediately.  Here’s my code:

env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new 
SugarDeserializer(), localKafkaProperties))
.keyBy((KeySelector) value -> 
value.getPayload().getSource().getTable())
.window(GlobalWindows.create())

.trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), 
Duration.of(1, ChronoUnit.MINUTES
.aggregate(new ListAggregator())
.addSink(new CopySink(new 
SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");


Thanks!

Kurt