Detecting when a job is "caught up"
I’ve got a Flink job that uses a HybridSource. It reads a bunch of S3 files and then transitions to a Kafka Topic where it stays processing live data. Everything gets written to a warehouse where users build and run reports. In takes about 36 hours to read data from the beginning before it’s caught up to the live data. I’m working on how to switch from V1 of my job to V2, without any down time for the user. Ideally I can start V2, let it read all the data from the beginning again, write it to a V2 warehouse, and only when it’s caught up, stop the V1 job and notify my reporting engine to use V2 instead of V1. Is there any way for Flink to trigger some function when V2 is caught up, so that my code could make the switch? Even something that could be called when the Hybrid Source switches from S3 to Kafka would be close enough for my purposes. Thanks, Kurt
Multiple sources ordering
I have a case where my Flink job needs to consume multiple sources. I have a topic in Kafka where the order of consuming is important. Because the cost of S3 is much less than storage on Kafka, we have a job that sinks to S3. The topic in Kafka can then retain just 3 days worth of data. My job needs to first consume everything from the existing S3 file(s) and only then start consuming from the Kafka topic. When using a union operator in Flink, the data comes in mixed from both sources. Is there any way that I can control the ordering so that it first reads S3, then Kafka all in the same job? Kurt
Re: Using s3 bucket for high availability
Thank you, I figured it out. My IAM policy was missing some actions. Seems I needed to give it “*” for it to work. From: Tamir Sagi Date: Wednesday, June 9, 2021 at 6:02 AM To: Yang Wang , Kurtis Walker Cc: user Subject: Re: Using s3 bucket for high availability EXTERNAL EMAIL I'd try several things try accessing the bucket from CLI first locally https://docs.aws.amazon.com/cli/latest/reference/s3/<https://urldefense.com/v3/__https:/docs.aws.amazon.com/cli/latest/reference/s3/__;!!P-5o2buULA!ikuTnxz7WokLw3i01LLcfzTCotpOR_X8C1JeQCzn6GXy27JaINvj73X0aDQYzFg0GDxn$> If it does not work please check your credentials under ~/.aws/credentials file + ~/.aws/config = since the AWS clients read the credentials from these files by default(unless other credentials are set) If everything works well: 1. Are you running inside EKS? if so, you must attach the pods a service account with corresponded permissions to S3. 2. If not, make sure the pods have the credentials to AWS(access key, secret key, region) Please provide more code snippet. I recently ran Flink job on Application cluster in EKS. the job also reads files from S3. (Without HA) Tamir [https://my-email-signature.link/signature.gif?u=1088647=157997735=61757801dda4474f1dcdec8227f1c40523846082fd5a0e52f68e63cfd6b92721] From: Yang Wang Sent: Wednesday, June 9, 2021 11:29 AM To: Kurtis Walker Cc: user Subject: Re: Using s3 bucket for high availability EXTERNAL EMAIL It seems to be a S3 issue. And I am not sure it is the root cause. Could you please share more details of the JobManager log? Or could you verify that the Flink cluster could access the S3 bucket successfully(e.g. store the checkpoint) when HA is disabled? Best, Yang Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> 于2021年6月8日周二 下午11:00写道: Sorry, fat finger send before I finished writing…. Hello, I’m trying to set up my flink native Kubernetes cluster with High availability. Here’s the relevant config: kubernetes.service-account: flink-service-account high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery I’m getting an error accessing the bucket. 2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client [] - Bucket region cache doesn't have an entry for corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket region from Amazon S3. 2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson [] - Failed to parse JSON string. com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (String)""; line: 1, column: 0] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-s3-fs-presto-1.13.0.jar:1.13.0] Is there an additional config I need for specifying the region for the bucket? I’ve been searching the doc and haven’t found anything like that. From: Kurtis Walker mailto:kurtis.wal...@sugarcrm.com>> Date: Tuesday, June 8, 2021 at 10:55 AM To: user mailto:user@flink.apache.org>> Subject: Using s3 bucket for high availability Hello, I’m trying to set up my flink native Kubernetes cluster with High availability. Here’s the relevant config: Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.
Re: Using s3 bucket for high availability
Sorry, fat finger send before I finished writing…. Hello, I’m trying to set up my flink native Kubernetes cluster with High availability. Here’s the relevant config: kubernetes.service-account: flink-service-account high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://corvana-target-file-upload-k8s-usw2.dev.sugar.build/flink/recovery I’m getting an error accessing the bucket. 2021-06-08 14:33:42,189 DEBUG com.amazonaws.services.s3.AmazonS3Client [] - Bucket region cache doesn't have an entry for corvana-target-file-upload-k8s-usw2.dev.sugar.build. Trying to get bucket region from Amazon S3. 2021-06-08 14:33:42,193 DEBUG com.amazonaws.util.json.Jackson [] - Failed to parse JSON string. com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (String)""; line: 1, column: 0] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-s3-fs-presto-1.13.0.jar:1.13.0] Is there an additional config I need for specifying the region for the bucket? I’ve been searching the doc and haven’t found anything like that. From: Kurtis Walker Date: Tuesday, June 8, 2021 at 10:55 AM To: user Subject: Using s3 bucket for high availability Hello, I’m trying to set up my flink native Kubernetes cluster with High availability. Here’s the relevant config:
Using s3 bucket for high availability
Hello, I’m trying to set up my flink native Kubernetes cluster with High availability. Here’s the relevant config:
Re: Backpressure configuration
Thanks Roman. I had already tried disableChaining, it didn’t have any effect. The built in JDBC sink is really slow compared to a bulk load(close to 100x), but I had tested that and saw the same issue. When a given message triggers the JDBC sink to write a batch, everything else waits for it. From: Roman Khachatryan Date: Thursday, April 29, 2021 at 2:49 PM To: Kurtis Walker Cc: user@flink.apache.org Subject: Re: Backpressure configuration EXTERNAL EMAIL Hello Kurt, Assuming that your sink is blocking, I would first make sure that it is not chained with the preceding operators. Otherwise, the same thread will output data and perform windowing/triggering. You can add disableChaining after addSink to prevent this [1]. Besides that, you probably could use existing JDBC batching functionality by configuring JdbcExecutionOptions [2] and providing it to JdbcSink.sink() [3]. [1] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$> [2] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$> [3] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$> Regards, Roman On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker wrote: > > Hello, > > I’m building a POC for a Flink app that loads data from Kafka in to a > Greenplum data warehouse. I’ve built a custom sink operation that will bulk > load a number of rows. I’m using a global window, triggering on number of > records, to collect the rows for each sink. I’m finding that while the sink > is running, the previous operations of my app stop processing messages until > the sink operation completes. I guess this is the backpressure logic kicking > in. The cost being I get about 60% of the throughput that is theoretically > possible. Is there any configuration that would let me control that > backpressure so that Flink will buffer rows when it encounters backpressure? > In the ideal world when a sink operation completes, the next batch of rows is > ready for the sink to pick up immediately. Here’s my code: > > > > env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new > SugarDeserializer(), localKafkaProperties)) > > .keyBy((KeySelector) value -> > value.getPayload().getSource().getTable()) > > .window(GlobalWindows.create()) > > > .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), > Duration.of(1, ChronoUnit.MINUTES > > .aggregate(new ListAggregator()) > > .addSink(new CopySink(new > SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink"); > > > > > > Thanks! > > > > Kurt > >
Backpressure configuration
Hello, I’m building a POC for a Flink app that loads data from Kafka in to a Greenplum data warehouse. I’ve built a custom sink operation that will bulk load a number of rows. I’m using a global window, triggering on number of records, to collect the rows for each sink. I’m finding that while the sink is running, the previous operations of my app stop processing messages until the sink operation completes. I guess this is the backpressure logic kicking in. The cost being I get about 60% of the throughput that is theoretically possible. Is there any configuration that would let me control that backpressure so that Flink will buffer rows when it encounters backpressure? In the ideal world when a sink operation completes, the next batch of rows is ready for the sink to pick up immediately. Here’s my code: env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new SugarDeserializer(), localKafkaProperties)) .keyBy((KeySelector) value -> value.getPayload().getSource().getTable()) .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), Duration.of(1, ChronoUnit.MINUTES .aggregate(new ListAggregator()) .addSink(new CopySink(new SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink"); Thanks! Kurt