Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values
Hi, Thanks for driving this @Till Rohrmann . I would give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 15s and 3s would be enough either. IMO, except for the standalone cluster, where the heartbeat mechanism in Flink is totally relied, reducing the heartbeat can also help JM to find out faster TaskExecutors in abnormal conditions that can not respond to the heartbeat requests, e.g., continuously Full GC, though the process of TaskExecutor is alive and may not be known by the deployment system. Since there are cases that can benefit from this change, I think it could be done if it won't break the experience in other scenarios. If we can address what will block the main threads from processing heartbeats, or enlarge the GC costs, we can try to get rid of them to have a more predictable response time of heartbeat, or give some advices to users if their jobs may encounter these issues. For example, as far as I know JM of a large scale job will be more busy and may not able to process heartbeats in time, then we can give a advice that users working with job large than 5000 tasks should enlarge there heartbeat interval to 10s and timeout to 50s. The numbers are written casually. As for the issue in FLINK-23216, I think it should be fixed and may not be a main concern for this case. On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann wrote: > Thanks for sharing these insights. > > I think it is no longer true that the ResourceManager notifies the > JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details. > > Given the GC pauses, would you then be ok with decreasing the heartbeat > timeout to 20 seconds? This should give enough time to do the GC and then > still send/receive a heartbeat request. > > I also wanted to add that we are about to get rid of one big cause of > blocking I/O operations from the main thread. With FLINK-22483 [2] we will > get rid of Filesystem accesses to retrieve completed checkpoints. This > leaves us with one additional file system access from the main thread which > is the one completing a pending checkpoint. I think it should be possible > to get rid of this access because as Stephan said it only writes > information to disk that is already written before. Maybe solving these two > issues could ease concerns about long pauses of unresponsiveness of Flink. > > [1] https://issues.apache.org/jira/browse/FLINK-23216 > [2] https://issues.apache.org/jira/browse/FLINK-22483 > > Cheers, > Till > > On Wed, Jul 21, 2021 at 4:58 AM Yang Wang wrote: > >> Thanks @Till Rohrmann for starting this >> discussion >> >> Firstly, I try to understand the benefit of shorter heartbeat timeout. >> IIUC, it will make the JobManager aware of >> TaskManager faster. However, it seems that only the standalone cluster >> could benefit from this. For Yarn and >> native Kubernetes deployment, the Flink ResourceManager should get the >> TaskManager lost event in a very short time. >> >> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM >> * Less than 1 second, Flink RM has a watch for all the TaskManager pods >> >> Secondly, I am not very confident to decrease the timeout to 15s. I have >> quickly checked the TaskManager GC logs >> in the past week of our internal Flink workloads and find more than 100 >> 10-seconds Full GC logs, but no one is bigger than 15s. >> We are using CMS GC for old generation. >> >> >> Best, >> Yang >> >> Till Rohrmann 于2021年7月17日周六 上午1:05写道: >> >>> Hi everyone, >>> >>> Since Flink 1.5 we have the same heartbeat timeout and interval default >>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval: >>> 10s. These values were mainly chosen to compensate for lengthy GC pauses >>> and blocking operations that were executed in the main threads of Flink's >>> components. Since then, there were quite some advancements wrt the JVM's >>> GCs and we also got rid of a lot of blocking calls that were executed in >>> the main thread. Moreover, a long heartbeat.timeout causes long recovery >>> times in case of a TaskManager loss because the system can only properly >>> recover after the dead TaskManager has been removed from the scheduler. >>> Hence, I wanted to propose to change the timeout and interval to: >>> >>> heartbeat.timeout: 15s >>> heartbeat.interval: 3s >>> >>> Since there is no perfect solution that fits all use cases, I would >>> really >>> like to hear from you what you think about it and how you configure these >>> heartbeat options. Based on your experience we might actually come up >>> with >>> better default values that allow us to be resilient but also to detect >>> failed components fast. FLIP-185 can be found here [1]. >>> >>> [1] https://cwiki.apache.org/confluence/x/GAoBCw >>> >>> Cheers, >>> Till >>> >>
Flink TaskManager container got restarted by K8S very frequently
Hi Flink Community, Recently I deployed a Flink cluster(1 JM, 1TM) with k8s standalone mode. Later on I notice that the pod which the TM is running on got restarted by k8s very frequently (3 times within 10 minutes). And I didn't see any error log for this pod. I tried to increase the container memory in both flink-conf.yaml file and k8s yaml file but that didn't help to solve this problem either. Are there any other issues that may cause this problem? My k8s cluster has 5 nodes, each node has 4 vcpu and 16GB memory and the TM is not running any job. flink-conf.yaml: jobmanager.memory.process.size: 1600Mb jobmanager.rpc.address: flink-test-job-jobmanager-service blob.server.port: 6124 query.server.port: 6125 taskmanager.memory.process.size: 2048Mb taskmanager.numberOfTaskSlots: 1 state.backend: filesystem state.checkpoints.dir: file:///tmp/flink-checkpoints-directory state.savepoints.dir: file:///tmp/flink-savepoints-directory heartbeat.interval: 1000 heartbeat.timeout: 5000 task manager yaml file: spec: containers: - name: taskmanager image: ### imagePullPolicy: Always command: ["taskmanager.sh"] args: ["start-foreground"] env: - name: JOB_MANAGER_RPC_ADDRESS value: flink-test-job-jobmanager-service resources: limits: cpu: 4 memory: "4096Mi" requests: cpu: 1 memory: "2048Mi" ports: - containerPort: 6122 name: rpc livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: volumes: - name: flink-config-volume configMap: name: test-job-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - key: log4j-cli.properties path: log4j-cli.properties flink-conf.yaml Description: flink-conf.yaml
Need help of deploying Flink HA on kubernetes cluster
hi , I am very new to flink , I am planning to install Flink HA setup on eks cluster with 5 worker nodes . Please can some one point me to right materials or direction how to install as well as any sample job which I can run only for testing and confirm all things are working as expected . --Dhirendra
Re: confirm subscribe to user@flink.apache.org
need to be part of flink mailing list On Wednesday, July 21, 2021, 11:22:14 PM AST, user-h...@flink.apache.org wrote: Hi! This is the ezmlm program. I'm managing the user@flink.apache.org mailing list. To confirm that you would like userdh...@yahoo.com added to the user mailing list, please send a short reply to this address: user-sc.1626924133.plfdinihnlgmmkeoefpi-userdhiru=yahoo@flink.apache.org Usually, this happens when you just hit the "reply" button. If this does not work, simply copy the address and paste it into the "To:" field of a new message. This confirmation serves two purposes. First, it verifies that I am able to get mail through to you. Second, it protects you in case someone forges a subscription request in your name. Please note that ALL Apache dev- and user- mailing lists are publicly archived. Do familiarize yourself with Apache's public archive policy at http://www.apache.org/foundation/public-archives.html prior to subscribing and posting messages to user@flink.apache.org. If you're not sure whether or not the policy applies to this mailing list, assume it does unless the list name contains the word "private" in it. Some mail programs are broken and cannot handle long addresses. If you cannot reply to this request, instead send a message to and put the entire address listed above into the "Subject:" line. --- Administrative commands for the user list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: To remove your address from the list, send a message to: Send mail to the following for info and FAQ for this list: Similar addresses exist for the digest list: To get messages 123 through 145 (a maximum of 100 per request), mail: To get an index with subject and author for messages 123-456 , mail: They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example "john@host.domain", just add a hyphen and your address (with '=' instead of '@') after the command word: To stop subscription for this address, mail: In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at user-ow...@flink.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: Received: (qmail 92990 invoked by uid 99); 22 Jul 2021 03:22:12 - Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) (116.203.196.100) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jul 2021 03:22:12 + Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-de.apache.org (ASF Mail Server at spamproc1-he-de.apache.org) with ESMTP id 27EC21FF507 for ; Thu, 22 Jul 2021 03:22:12 + (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org X-Spam-Flag: NO X-Spam-Score: 0.098 X-Spam-Level: X-Spam-Status: No, score=0.098 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, MIME_HTML_MOSTLY=0.1, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamproc1-he-de.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=yahoo.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, port 10024) with ESMTP id U1jECHmUiO7J for ; Thu, 22 Jul 2021 03:22:11 + (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=74.6.129.125; helo=sonic309-15.consmr.mail.bf2.yahoo.com; envelope-from=userdh...@yahoo.com; receiver= Received: from sonic309-15.consmr.mail.bf2.yahoo.com (sonic309-15.consmr.mail.bf2.yahoo.com [74.6.129.125]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 443AABC48D for ; Thu, 22 Jul 2021 03:22:11 + (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=yahoo.com; s=s2048; t=1626924125; bh=87+mgLe3FBWRiH96WaynBmel0Zi/LRTFPH6z8EvR8LY=; h=Date:From:To:Subject:References:From:Subject:Reply-To;
Questions about keyed streams
Hi. 1) If I use the same key in downstream operators (my key is a user id), will the rows stay on the same TaskManager machine? I join in more info based on the user id as the key. I'd like for these to stay on the same machine rather than shuffle a bunch of user-specific info to multiple task manager machines. 2) What are best practices to reduce the number of shuffles when having multiple kafka topics with similar keys (user id). E.g. should I make make sure the same key writes to the same partition number and then manually which flink tasks get which kafka partitions?
请教union算子union多个source 流时的健壮性如何保证
请问大家在使用 union算子union多个 stream时,比如 stream1.union(stream2, stream3, … stream n) ,其中1到n分别来自不同的MQ 集群MQ1, MQ2… MQ n, 当其中几个集群挂掉时, 整个flink 应用都会重启,那么该场景下怎么可以做到 某几条stream 异常挂掉后,而不影响其他流的 union,让整个 flink继续运行呢? [image: image.png] BR Fisher
Re: Recover from savepoints with Kubernetes HA
Hi Thomas, I've got a few questions that will hopefully help get to find an answer: What job properties are you trying to change? Something like parallelism? What mode is your job running in? i.e., Session, Per-Job, or Application? Can you also describe how you're redeploying the job? Are you using the Native Kubernetes integration or Standalone (i.e. writing k8s manifest files yourself)? It sounds like you are using the Flink CLI as well, is that correct? Thanks, Austin On Wed, Jul 21, 2021 at 4:05 AM Thms Hmm wrote: > Hey, > > we have some application clusters running on Kubernetes and explore the HA > mode which is working as expected. When we try to upgrade a job, e.g. > trigger a savepoint, cancel the job and redeploy, Flink is not restarting > from the savepoint we provide using the -s parameter. So all state is lost. > > If we just trigger the savepoint without canceling the job and redeploy > the HA mode picks up from the latest savepoint. > > But this way we can not upgrade job properties as they were picked up from > the savepoint as it seems. > > Is there any advice on how to do upgrades with HA enabled? > > Flink version is 1.12.2. > > Thanks for your help. > > Kr thomas >
回复:请教on yarn per job 作业采集日志进行监控方案
直接配置influxdb reporter,用gafana大盘展示,非常方便一台机器即可。 --原始邮件-- 发件人: "user-zh"
Re: Kafka data sources, multiple interval joins and backfilling
Hi Dan, unfortunately Flink currently provides no source level synchronization, except for Kinesis [1], so it's easy to run into large states, when processing historical data. There is an on-going effort, to provide a generic watermark-based alignment of FLIP-27 sources [2], that will most likely help to mitigate the issue. [1] https://issues.apache.org/jira/browse/FLINK-10886 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources Best, D. On Wed, Jul 21, 2021 at 8:43 AM JING ZHANG wrote: > Hi Dan, > > I've tried playing around with parallelism and resources. It does help. > Glad to hear your problem is solved . > > > Does Flink have special logic with the built in interval join code that > impacts how kafka data sources are read? > No. If you said the way I mentioned in the last email, I mean to add > control the consumption order of each source in a custom Kafka connector. > > Dan Hill 于2021年7月21日周三 下午2:10写道: > >> Thanks JING and Caizhi! >> >> Yea, I've tried playing around with parallelism and resources. It does >> help. >> >> We have our own join operator that acts like an interval join (with fuzzy >> matching). We wrote our own KeyedCoProcessFunction and modeled it closely >> after the internal interval join code. Does Flink have special logic with >> the built in interval join code that impacts how kafka data sources are >> read? >> >> >> >> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG wrote: >> >>> Hi Dan, >>> You are right. In interval join, if one of input stream is far ahead of >>> the other one, its data would be buffered into state until watermark of the >>> other input stream catches up. >>> This is a known issue of interval join. And this situation is even worse >>> in your example because of the following reasons: >>> 1. Running as backfills >>> 2. There are cascading interval joins in the topology >>> >>> There is a hack way to walk around, hope it helps. Control the consume >>> data of each source based on the following sequence: >>> 1. Consume the larger data source in the same join after the smaller >>> source consumption finished. >>> 2. Consume the source in the following join after the previous join >>> finished >>> >>> BTW: Please double check you use interval join instead of regular join, >>> this would happen if compare two field with regular timestamp type in join >>> condition instead of time attribute. >>> >>> Best, >>> JING ZHANG >>> >>> Dan Hill 于2021年7月21日周三 上午4:25写道: >>> Hi. My team's flink job has cascading interval joins. The problem I'm outlining below is fine when streaming normally. It's an issue with backfills. We've been running into a bunch of backfills to evaluate the job over older data. When running as backfills, I've noticed that sometimes one of downstream kafka inputs will read in a lot of data from it's kafka source before the upstream kafka sources makes much progress. The downstream kafka source gets far ahead of the interval join window constrained by the upstream sources. This appears to cause the state to grow unnecessarily and has caused checkpoint failures. I assumed there was built in Flink code to not get too far ahead for a single downstream kafka source. Looking through the code, I don't think this exists. Is this a known issue with trying to use Flink to backfill? Am I misunderstanding something? Here's an example flow chart for a cascading join job. One of the right kafka data sources goes 10x-100x more records than the left data sources and causes state to grow. [image: Screen Shot 2021-07-20 at 1.02.27 PM.png] >>>
Re: Stateful Functions Status
Not yet unfortunately, But I'd be very much happy to work with the community on a JS SDK. On Tue, Jul 20, 2021 at 4:32 PM Omid Bakhshandeh wrote: > Igal, > > Thanks for the answers. Is there any JS SDK available? > > Best, > --Omid > > On Tue, Jul 20, 2021 at 10:23 AM Igal Shilman wrote: > >> Hi Omid, >> >> I'm glad to hear that you are evaluating StateFun in your company! let me >> try to answer your questions: >> >> 1. In version 2.x, StateFun only supported messages of type >> com.google.protobuf.Any, and we had a tiny optimization that >> reads type hints and unpacked the real message out of the Any message. >> Version 3.x removed protobuf from the API surface (hence no more Any) but >> while Protobuf is not a requirement, you can still use Protobuf to send >> and receive messages by using [1][2]. >> >> 2. The benefits of gRPC in StateFun can be a long discussion, since >> currently StateFun does use Protobuf over HTTP/2 (if the remote function's >> app server supports that), and with a built-in backpressure mechanism >> (backpressure comes from Flink). >> Having said that, we are currently working on making the transport >> pluggable, and gRPC based transport is a natural candidate. >> >> 3. I think this is a very interesting point, and StateFun doesn't promote >> any specific paradigm here. >> >> The basic building block is a function, a specific function is uniquely >> addressed by providing a namespace, a function type, and an id. >> a group of functions that implement a specific API can share a namespace >> prefix, for example "com.foo.api.auth/". >> You can perhaps (by convention) have a public function per namespace that >> exposes some sort of an API (list of messages that it supports) >> And it can dispatch the messages internally to the various functions. >> >> Alternatively, a "client" library for your auth API can be a Python class >> with named methods that accepts a StateFun context, and translates a method >> invocation to a message sent to the corresponding function. The clients >> of your functions will basically invoke methods on an object. >> >> Perhaps a generated interface described by gRPC, is a good idea to >> explore further :-) >> >> 4. I'm not sure what KNative example you are looking for, as StateFun >> remote functions do not require any specific type of deployment, they are >> like regular Flask services. >> >> Looking forward to learning what you've built :-) >> Good luck! >> Igal. >> >> [1] >> https://github.com/apache/flink-statefun-playground/blob/release-3.0/python/showcase/showcase/showcase_custom_types.py#L28 >> [2] >> https://github.com/apache/flink-statefun-playground/blob/release-3.0/python/showcase/showcase/__main__.py#L89,L91 >> >> >> >> On Tue, Jul 20, 2021 at 3:07 AM Omid Bakhshandeh < >> omidbakhshan...@gmail.com> wrote: >> >>> Hi, >>> >>> We are evaluating Flink Stateful Functions in our company and we are >>> trying to see if it fits our needs. I'm hoping to get some help from the >>> community as we do this. >>> >>> There are a couple of primary questions that can speed up our process: >>> >>> 1- It seems in version 2.2.0, in the Python SDK, it was possible to have >>> messages with a specific type because everything was Protobuf but in 3.0.0 >>> that is not possible and there is always some boilerplate to convert >>> messages. >>> >>> @functions.bind("showcase/messaging") def messaging(context: Context, message: Message): >>> >>> vs >>> def greet(context, greet_request: GreetRequest): >>> >>> >>> Is that right? >>> >>> >>> 2- Is GRPC and maybe more efficient protocols part of the roadmap in the >>> near future? >>> >>> 3- All of the examples I found on the Python SDK, all the function has >>> been written in a single file with no specific structure (e.g. >>> implementing an API or ...), is there a better way to create Functions in a >>> more structured way? How can one share these functions within teams and >>> other projects? It would be great if something like GRPC services and API >>> exists for functions so other users can get into the dev cycle. >>> >>> 4- Is there any KNative example? >>> >>> I hope these questions make sense. >>> Thanks, >>> -- >>> >>> Omid >>> >> > > -- > --- > Omid >
Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values
Thanks for sharing these insights. I think it is no longer true that the ResourceManager notifies the JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details. Given the GC pauses, would you then be ok with decreasing the heartbeat timeout to 20 seconds? This should give enough time to do the GC and then still send/receive a heartbeat request. I also wanted to add that we are about to get rid of one big cause of blocking I/O operations from the main thread. With FLINK-22483 [2] we will get rid of Filesystem accesses to retrieve completed checkpoints. This leaves us with one additional file system access from the main thread which is the one completing a pending checkpoint. I think it should be possible to get rid of this access because as Stephan said it only writes information to disk that is already written before. Maybe solving these two issues could ease concerns about long pauses of unresponsiveness of Flink. [1] https://issues.apache.org/jira/browse/FLINK-23216 [2] https://issues.apache.org/jira/browse/FLINK-22483 Cheers, Till On Wed, Jul 21, 2021 at 4:58 AM Yang Wang wrote: > Thanks @Till Rohrmann for starting this discussion > > Firstly, I try to understand the benefit of shorter heartbeat timeout. > IIUC, it will make the JobManager aware of > TaskManager faster. However, it seems that only the standalone cluster > could benefit from this. For Yarn and > native Kubernetes deployment, the Flink ResourceManager should get the > TaskManager lost event in a very short time. > > * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM > * Less than 1 second, Flink RM has a watch for all the TaskManager pods > > Secondly, I am not very confident to decrease the timeout to 15s. I have > quickly checked the TaskManager GC logs > in the past week of our internal Flink workloads and find more than 100 > 10-seconds Full GC logs, but no one is bigger than 15s. > We are using CMS GC for old generation. > > > Best, > Yang > > Till Rohrmann 于2021年7月17日周六 上午1:05写道: > >> Hi everyone, >> >> Since Flink 1.5 we have the same heartbeat timeout and interval default >> values that are defined as heartbeat.timeout: 50s and heartbeat.interval: >> 10s. These values were mainly chosen to compensate for lengthy GC pauses >> and blocking operations that were executed in the main threads of Flink's >> components. Since then, there were quite some advancements wrt the JVM's >> GCs and we also got rid of a lot of blocking calls that were executed in >> the main thread. Moreover, a long heartbeat.timeout causes long recovery >> times in case of a TaskManager loss because the system can only properly >> recover after the dead TaskManager has been removed from the scheduler. >> Hence, I wanted to propose to change the timeout and interval to: >> >> heartbeat.timeout: 15s >> heartbeat.interval: 3s >> >> Since there is no perfect solution that fits all use cases, I would really >> like to hear from you what you think about it and how you configure these >> heartbeat options. Based on your experience we might actually come up with >> better default values that allow us to be resilient but also to detect >> failed components fast. FLIP-185 can be found here [1]. >> >> [1] https://cwiki.apache.org/confluence/x/GAoBCw >> >> Cheers, >> Till >> >
退订
退订
Recover from savepoints with Kubernetes HA
Hey, we have some application clusters running on Kubernetes and explore the HA mode which is working as expected. When we try to upgrade a job, e.g. trigger a savepoint, cancel the job and redeploy, Flink is not restarting from the savepoint we provide using the -s parameter. So all state is lost. If we just trigger the savepoint without canceling the job and redeploy the HA mode picks up from the latest savepoint. But this way we can not upgrade job properties as they were picked up from the savepoint as it seems. Is there any advice on how to do upgrades with HA enabled? Flink version is 1.12.2. Thanks for your help. Kr thomas
Re: 请教on yarn per job 作业采集日志进行监控方案
source和sink端监控 input/output qps波动,效果还可以,方案也比较成熟 yihan xu 于2021年7月21日周三 下午12:48写道: > 原本作业基本处于半裸奔的状态,最近线上出了一次小事故后,在考虑如何实时采集作业日志或者metric再配置告警。 > 网上初步搜了一下,好像就是prometheus+grafana或者elk。 > > 请教各位大佬的项目目前都是用什么方式,我们小公司就我一个人搞flink,半路出家水平也有限,请大佬们推荐个易维护坑少点的方式?谢谢。 > > 发自我的iPhone > > > 发自我的iPhone
Re: Kafka data sources, multiple interval joins and backfilling
Hi Dan, > I've tried playing around with parallelism and resources. It does help. Glad to hear your problem is solved . > Does Flink have special logic with the built in interval join code that impacts how kafka data sources are read? No. If you said the way I mentioned in the last email, I mean to add control the consumption order of each source in a custom Kafka connector. Dan Hill 于2021年7月21日周三 下午2:10写道: > Thanks JING and Caizhi! > > Yea, I've tried playing around with parallelism and resources. It does > help. > > We have our own join operator that acts like an interval join (with fuzzy > matching). We wrote our own KeyedCoProcessFunction and modeled it closely > after the internal interval join code. Does Flink have special logic with > the built in interval join code that impacts how kafka data sources are > read? > > > > On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG wrote: > >> Hi Dan, >> You are right. In interval join, if one of input stream is far ahead of >> the other one, its data would be buffered into state until watermark of the >> other input stream catches up. >> This is a known issue of interval join. And this situation is even worse >> in your example because of the following reasons: >> 1. Running as backfills >> 2. There are cascading interval joins in the topology >> >> There is a hack way to walk around, hope it helps. Control the consume >> data of each source based on the following sequence: >> 1. Consume the larger data source in the same join after the smaller >> source consumption finished. >> 2. Consume the source in the following join after the previous join >> finished >> >> BTW: Please double check you use interval join instead of regular join, >> this would happen if compare two field with regular timestamp type in join >> condition instead of time attribute. >> >> Best, >> JING ZHANG >> >> Dan Hill 于2021年7月21日周三 上午4:25写道: >> >>> Hi. My team's flink job has cascading interval joins. The problem I'm >>> outlining below is fine when streaming normally. It's an issue with >>> backfills. We've been running into a bunch of backfills to evaluate the >>> job over older data. >>> >>> When running as backfills, I've noticed that sometimes one of downstream >>> kafka inputs will read in a lot of data from it's kafka source before the >>> upstream kafka sources makes much progress. The downstream kafka source >>> gets far ahead of the interval join window constrained by the upstream >>> sources. This appears to cause the state to grow unnecessarily and has >>> caused checkpoint failures. I assumed there was built in Flink code to not >>> get too far ahead for a single downstream kafka source. Looking through >>> the code, I don't think this exists. >>> >>> Is this a known issue with trying to use Flink to backfill? Am I >>> misunderstanding something? >>> >>> Here's an example flow chart for a cascading join job. One of the right >>> kafka data sources goes 10x-100x more records than the left data sources >>> and causes state to grow. >>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png] >>> >>
Re: Kafka data sources, multiple interval joins and backfilling
Thanks JING and Caizhi! Yea, I've tried playing around with parallelism and resources. It does help. We have our own join operator that acts like an interval join (with fuzzy matching). We wrote our own KeyedCoProcessFunction and modeled it closely after the internal interval join code. Does Flink have special logic with the built in interval join code that impacts how kafka data sources are read? On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG wrote: > Hi Dan, > You are right. In interval join, if one of input stream is far ahead of > the other one, its data would be buffered into state until watermark of the > other input stream catches up. > This is a known issue of interval join. And this situation is even worse > in your example because of the following reasons: > 1. Running as backfills > 2. There are cascading interval joins in the topology > > There is a hack way to walk around, hope it helps. Control the consume > data of each source based on the following sequence: > 1. Consume the larger data source in the same join after the smaller > source consumption finished. > 2. Consume the source in the following join after the previous join > finished > > BTW: Please double check you use interval join instead of regular join, > this would happen if compare two field with regular timestamp type in join > condition instead of time attribute. > > Best, > JING ZHANG > > Dan Hill 于2021年7月21日周三 上午4:25写道: > >> Hi. My team's flink job has cascading interval joins. The problem I'm >> outlining below is fine when streaming normally. It's an issue with >> backfills. We've been running into a bunch of backfills to evaluate the >> job over older data. >> >> When running as backfills, I've noticed that sometimes one of downstream >> kafka inputs will read in a lot of data from it's kafka source before the >> upstream kafka sources makes much progress. The downstream kafka source >> gets far ahead of the interval join window constrained by the upstream >> sources. This appears to cause the state to grow unnecessarily and has >> caused checkpoint failures. I assumed there was built in Flink code to not >> get too far ahead for a single downstream kafka source. Looking through >> the code, I don't think this exists. >> >> Is this a known issue with trying to use Flink to backfill? Am I >> misunderstanding something? >> >> Here's an example flow chart for a cascading join job. One of the right >> kafka data sources goes 10x-100x more records than the left data sources >> and causes state to grow. >> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png] >> >