Re: [DISCUSS] Releasing Flink 1.11.4
Hi Till, Sorry for the late reply. The previous period, I focused on another urgent work, and suspended the releasing work. I've recently restarted it. Best, Godfrey Till Rohrmann 于2021年7月13日周二 下午8:36写道: > Hi Godfrey, > > Are you continuing with the 1.11.4 release process? > > Cheers, > Till > > On Tue, Jul 6, 2021 at 1:15 PM Chesnay Schepler > wrote: > > > Since 1.11.4 is about releasing the commits we already have merged > > between 1.11.3 and 1.13.0, I would suggest to not add additional fixes. > > > > On 06/07/2021 12:47, Matthias Pohl wrote: > > > Hi Godfrey, > > > Thanks for volunteering to be the release manager for 1.11.4. > FLINK-21445 > > > [1] has a backport PR for 1.11.4 [2] prepared. I wouldn't label it as a > > > blocker but it would be nice to have it included in 1.11.4 considering > > that > > > it's quite unlikely to have another 1.11.5 release. Right now, AzureCI > is > > > running as a final step. I'm CC'ing Chesnay because he would be in > charge > > > of merging the PR. > > > > > > Matthias > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21445 > > > [2] https://github.com/apache/flink/pull/16387 > > > > > > On Wed, Jun 30, 2021 at 2:15 PM godfrey he > wrote: > > > > > >> Hi devs, > > >> > > >> As discussed in [1], I would like to start a discussion for releasing > > Flink > > >> 1.11.4. > > >> > > >> I would like to volunteer as the release manger for 1.11.4, and will > > start > > >> the release process on the next Wednesday (July 7th). > > >> > > >> There are 75 issues that have been closed or resolved [2], > > >> and no blocker issues left [3] so far. > > >> > > >> If any issues need to be marked as blocker for 1.11.4, please let me > > know > > >> in this thread! > > >> > > >> Best, > > >> Godfrey > > >> > > >> > > >> [1] > > >> > > >> > > > https://lists.apache.org/thread.html/r40a541027c6a04519f37c61f2a6f3dabdb821b3760cda9cc6ebe6ce9%40%3Cdev.flink.apache.org%3E > > >> [2] > > >> > > >> > > > https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.11.4%20AND%20status%20in%20(Closed%2C%20Resolved) > > >> [3] > > >> > > >> > > > https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.11.4%20AND%20status%20not%20in%20(Closed%2C%20Resolved)%20ORDER%20BY%20priority%20DESC > > >> > > > > >
[jira] [Created] (FLINK-23446) Refactor SQL Client end to end tests
Shengkai Fang created FLINK-23446: - Summary: Refactor SQL Client end to end tests Key: FLINK-23446 URL: https://issues.apache.org/jira/browse/FLINK-23446 Project: Flink Issue Type: Improvement Components: Table SQL / Client, Tests Affects Versions: 1.14.0 Reporter: Shengkai Fang Fix For: 1.14.0 Remove useage of the YAML in SQL Client end to end tests -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23445) Separate data and timer connection into different channels for Python Table API operators
Dian Fu created FLINK-23445: --- Summary: Separate data and timer connection into different channels for Python Table API operators Key: FLINK-23445 URL: https://issues.apache.org/jira/browse/FLINK-23445 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.14.0 This is a follow-up of FLINK-23401 where we split the data and timer into different channels for Python DataStream operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23444) Slf4j 1.7.15 has the high-risk vulnerability CVE-2018-8088
Hui Wang created FLINK-23444: Summary: Slf4j 1.7.15 has the high-risk vulnerability CVE-2018-8088 Key: FLINK-23444 URL: https://issues.apache.org/jira/browse/FLINK-23444 Project: Flink Issue Type: Improvement Components: API / Core Affects Versions: 1.13.1, 1.12.3, 1.13.0, 1.11.3 Reporter: Hui Wang Slf4j 1.7.15 has the high-risk vulnerability CVE-2018-8088 [1] . When can Flink be upgraded to a more secure version? [1] https://nvd.nist.gov/vuln/detail/CVE-2018-8088 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23443) LaunchCoordinatorTest fails on azure
Xintong Song created FLINK-23443: Summary: LaunchCoordinatorTest fails on azure Key: FLINK-23443 URL: https://issues.apache.org/jira/browse/FLINK-23443 Project: Flink Issue Type: Bug Components: Deployment / Mesos Affects Versions: 1.13.1 Reporter: Xintong Song Fix For: 1.13.2 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20751=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=ea63c80c-957f-50d1-8f67-3671c14686b9=25444 {code} Jul 20 21:54:52 [ERROR] Tests run: 24, Failures: 0, Errors: 5, Skipped: 0, Time elapsed: 8.366 s <<< FAILURE! - in org.apache.flink.mesos.scheduler.LaunchCoordinatorTest Jul 20 21:54:52 [ERROR] The LaunchCoordinator when in state GatheringOffers should handle StateTimeout which transitions to Idle when task queue is empty(org.apache.flink.mesos.scheduler.LaunchCoordinatorTest) Time elapsed: 0.117 s <<< ERROR! Jul 20 21:54:52 java.lang.IllegalStateException: cannot reserve actor name '$$I': already terminated Jul 20 21:54:52 at akka.actor.dungeon.ChildrenContainer$TerminatedChildrenContainer$.reserve(ChildrenContainer.scala:94) Jul 20 21:54:52 at akka.actor.dungeon.Children$class.reserveChild(Children.scala:135) Jul 20 21:54:52 at akka.actor.ActorCell.reserveChild(ActorCell.scala:429) Jul 20 21:54:52 at akka.testkit.TestActorRef.(TestActorRef.scala:33) Jul 20 21:54:52 at akka.testkit.TestFSMRef.(TestFSMRef.scala:40) Jul 20 21:54:52 at akka.testkit.TestFSMRef$.apply(TestFSMRef.scala:91) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest$Context.(LaunchCoordinatorTest.scala:254) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$15$$anonfun$apply$mcV$sp$33$$anon$24.(LaunchCoordinatorTest.scala:446) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$15$$anonfun$apply$mcV$sp$33.apply(LaunchCoordinatorTest.scala:446) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest$$anonfun$1$$anonfun$apply$mcV$sp$8$$anonfun$apply$mcV$sp$15$$anonfun$apply$mcV$sp$33.apply(LaunchCoordinatorTest.scala:446) Jul 20 21:54:52 at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) Jul 20 21:54:52 at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) Jul 20 21:54:52 at org.scalatest.Transformer.apply(Transformer.scala:22) Jul 20 21:54:52 at org.scalatest.Transformer.apply(Transformer.scala:20) Jul 20 21:54:52 at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078) Jul 20 21:54:52 at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest.withFixture(LaunchCoordinatorTest.scala:57) Jul 20 21:54:52 at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075) Jul 20 21:54:52 at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088) Jul 20 21:54:52 at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088) Jul 20 21:54:52 at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) Jul 20 21:54:52 at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088) Jul 20 21:54:52 at org.apache.flink.mesos.scheduler.LaunchCoordinatorTest.runTest(LaunchCoordinatorTest.scala:57) Jul 20 21:54:52 at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147) Jul 20 21:54:52 at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147) Jul 20 21:54:52 at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) Jul 20 21:54:52 at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) Jul 20 21:54:52 at scala.collection.immutable.List.foreach(List.scala:392) Jul 20 21:54:52 at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) Jul 20 21:54:52 at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373) Jul 20 21:54:52 at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410) Jul 20 21:54:52 at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) Jul 20 21:54:52 at scala.collection.immutable.List.foreach(List.scala:392) Jul 20 21:54:52 at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) Jul 20 21:54:52 at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373) Jul 20 21:54:52 at
Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values
Thanks @Till Rohrmann for starting this discussion Firstly, I try to understand the benefit of shorter heartbeat timeout. IIUC, it will make the JobManager aware of TaskManager faster. However, it seems that only the standalone cluster could benefit from this. For Yarn and native Kubernetes deployment, the Flink ResourceManager should get the TaskManager lost event in a very short time. * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM * Less than 1 second, Flink RM has a watch for all the TaskManager pods Secondly, I am not very confident to decrease the timeout to 15s. I have quickly checked the TaskManager GC logs in the past week of our internal Flink workloads and find more than 100 10-seconds Full GC logs, but no one is bigger than 15s. We are using CMS GC for old generation. Best, Yang Till Rohrmann 于2021年7月17日周六 上午1:05写道: > Hi everyone, > > Since Flink 1.5 we have the same heartbeat timeout and interval default > values that are defined as heartbeat.timeout: 50s and heartbeat.interval: > 10s. These values were mainly chosen to compensate for lengthy GC pauses > and blocking operations that were executed in the main threads of Flink's > components. Since then, there were quite some advancements wrt the JVM's > GCs and we also got rid of a lot of blocking calls that were executed in > the main thread. Moreover, a long heartbeat.timeout causes long recovery > times in case of a TaskManager loss because the system can only properly > recover after the dead TaskManager has been removed from the scheduler. > Hence, I wanted to propose to change the timeout and interval to: > > heartbeat.timeout: 15s > heartbeat.interval: 3s > > Since there is no perfect solution that fits all use cases, I would really > like to hear from you what you think about it and how you configure these > heartbeat options. Based on your experience we might actually come up with > better default values that allow us to be resilient but also to detect > failed components fast. FLIP-185 can be found here [1]. > > [1] https://cwiki.apache.org/confluence/x/GAoBCw > > Cheers, > Till >
[jira] [Created] (FLINK-23442) FileSystemFactory refactor in flink-table-runtime
Yubin Li created FLINK-23442: Summary: FileSystemFactory refactor in flink-table-runtime Key: FLINK-23442 URL: https://issues.apache.org/jira/browse/FLINK-23442 Project: Flink Issue Type: Improvement Components: FileSystems, Table SQL / Runtime Affects Versions: 1.14.0 Reporter: Yubin Li FileSystemFactory interface has different definitons in the follow modules: flink-table-runtime flink-core once FileSystem.initialize() invoked, only the classes implelement FileSystemFactory of flink-core can be loaded, and we can implement PluginFileSystemFactory to wrap the classes as mentioned before, but we can`t do the same refactor in flink-table-runtime -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23441) Remove CheckpointOptions Argument away from Snapshotable#snapshot
Yuan Mei created FLINK-23441: Summary: Remove CheckpointOptions Argument away from Snapshotable#snapshot Key: FLINK-23441 URL: https://issues.apache.org/jira/browse/FLINK-23441 Project: Flink Issue Type: Bug Components: Runtime / State Backends Reporter: Yuan Mei -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23440) The "State. Backend" of "Flink-CONF.YAML" is inconsistent with the documentation
yang89520 created FLINK-23440: - Summary: The "State. Backend" of "Flink-CONF.YAML" is inconsistent with the documentation Key: FLINK-23440 URL: https://issues.apache.org/jira/browse/FLINK-23440 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Affects Versions: 1.13.1 Reporter: yang89520 Attachments: flink-conf.png, flink1_13_doc.png Flink1.13 document, describing "StateBackends" is "HashMapStateBackend" and "EmbeddedRocksDBStateBackend". But the "flink-conf.yaml" configuration file supports 'jobmanager', 'filesystem', 'rocksdb' and is not synchronized with the document -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-180: Adjust StreamStatus and Idleness definition
This proposal to narrow the definition of idleness to focus on the event-time clock is great. Please mention that the "temporary status toggle" code will be removed. I agree with adding the markActive() functionality, for symmetry. Speaking of symmetry, could we now include the minor enhancement we discussed in FLIP-167, the exposure of watermark status changes on the Sink interface. I drafted a PR and would be happy to revisit it. https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70 The flip mentions a 'watermarkstatus' package for the WatermarkStatus class. Should it be 'eventtime' package? Regarding the change of 'streamStatus' to 'watermarkStatus', could you spell out what the new method names will be on each interface? May I suggest that Input.emitStreamStatus be Input.processStreamStatus? This is to help decouple the input's watermark status from the output's watermark status. I observe that AbstractStreamOperator is hardcoded to derive the output channel's status from the input channel's status. May I suggest we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to allow for the operator subclass to customize the processing of the aggregated watermark and watermark status. Maybe the FLIP should spell out the expected behavior of the generic watermark generator (TimestampsAndWatermarksOperator). Should the generator ignore the upstream idleness signal? I believe it propagates the signal, even though it also generates its own signals. Given that source-based and generic watermark generation shouldn't be combined, one could argue that the generic watermark generator should activate only when its input channel's watermark status is idle. Thanks again for this effort! -Eron On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise wrote: > Dear devs, > > We recently discovered that StreamStatus and Idleness is insufficiently > defined [1], so I drafted a FLIP [3] to amend that situation. It would be > good to hear more opinions on that matter. Ideally, we can make the changes > to 1.14 as some newly added methods are affected. > > Best, > > Arvid > > [1] > > https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E > [2] > > https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition >
[jira] [Created] (FLINK-23439) Update "Local Installation" Page
Seth Wiesman created FLINK-23439: Summary: Update "Local Installation" Page Key: FLINK-23439 URL: https://issues.apache.org/jira/browse/FLINK-23439 Project: Flink Issue Type: Improvement Components: chinese-translation Reporter: Seth Wiesman Sync translation with ae136d18d84ed4d4ff631e990fd57e8c1867b30f -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-177: Extend Sink API
Hi Guowei, 1. your idea is quite similar to FLIP-171 [1]. The question is if we implement FLIP-171 based on public interfaces (which would require exposing MailboxExecutor as described here in FLIP-177) or if it's better to implement it internally and hide it. The first option is an abstract base class; your second option would be an abstract interface that has matching implementation internally (similarly to AsyncIO). There is an example for option 1 in [2]; I think the idea was to additionally specify the batch size and batch timeout in the ctor. @Hausmann, Steffen knows more. 2. I guess your question is if current AsyncIO is not sufficient already if exactly-once is not needed? The answer is currently no, because AsyncIO is not doing any batching. The user could do batching before that but that's quite a bit of code. However, we should really think if AsyncIO should also support batching. I would also say that the scope of AsyncIO and AsyncSink is quite different: the first one is for application developers and the second one is for connector developers and would be deemed an implementation detail by the application developer. Of course, advanced users may fall in both categories, so the distinction does not always hold. Nevertheless, there is some overlap between both approaches and it's important to think if the added complexity warrants the benefit. It would be interesting to hear how other devs see that. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink [2] https://github.com/sthm/flink/blob/51614dc9371d6e352db768a404ba3cafddad08f0/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java On Tue, Jul 20, 2021 at 11:11 AM Guowei Ma wrote: > Hi, Avrid > Thank you Avrid for perfecting Sink through this FLIP. I have two little > questions > > 1. What do you think of us directly providing an interface as follows? In > this way, there may be no need to expose the Mailbox to the user. We can > implement an `AsyncSinkWriterOperator` to control the length of the queue. > If it is too long, do not call SinkWriter::write. > public interface AsyncSinkWriter > extends SinkWriter>, CommT, > WriterStateT> { please ignore the name of Tuple2 and XXXFuture at > first. > int getQueueLimit(); > } > > 2. Another question is: If users don't care about exactly once and the > unification of stream and batch, how about letting users use > `AsyncFunction` directly? I don’t have an answer either. I want to hear > your suggestions. > > Best, > Guowei > > > On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise wrote: > > > Dear devs, > > > > today I'd like to start the discussion on the Sink API. I have drafted a > > FLIP [1] with an accompanying PR [2]. > > > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in > > one. In this discussion, we should decide on the scope and cut out too > > invasive steps if we can't reach an agreement. > > > > Step 1 is to add a few more pieces of information to context objects. > > That's non-breaking and needed for the async communication pattern in > > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I > > think that this should entail the least discussions. > > > > Step 2 is to also offer the same context information to committers. Here > we > > can offer some compatibility methods to not break existing sinks. The > main > > use case would be some async exactly-once sink but I'm not sure if we > would > > use async communication patterns at all here (or simply wait for all > async > > requests to finish in a sync way). It may also help with async cleanup > > tasks though. > > > > While drafting Step 2, I noticed the big entanglement of the current API. > > To figure out if there is a committer during the stream graph creation, > we > > actually need to create a committer which can have unforeseen > consequences. > > Thus, I spiked if we can disentangle the interface and have separate > > interfaces for the different use cases. The resulting step 3 would be a > > completely breaking change and thus is probably controversial. However, > I'd > > also see the disentanglement as a way to prepare to make Sinks more > > expressive (write and commit coordinator) without completely overloading > > the main interface. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > > [2] https://github.com/apache/flink/pull/16399 > > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink > > >
[jira] [Created] (FLINK-23438) Bump httpclient from 4.5.3 to 4.5.13
Martijn Visser created FLINK-23438: -- Summary: Bump httpclient from 4.5.3 to 4.5.13 Key: FLINK-23438 URL: https://issues.apache.org/jira/browse/FLINK-23438 Project: Flink Issue Type: Improvement Reporter: Martijn Visser Assignee: Martijn Visser Flink is still using org.apache.httpcomponents.httpclient:4.5.3. That version is impacted by CVE-2020-13956 (though Flink is not impacted by it). The latest available version is 4.5.13 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] Release flink-shaded 14.0, release candidate 1
+1 (binding) * checked the website PR * verified changed versions and NOTICE files since version 13.0 * verified the checksum and the signature Best, Dawid On 19/07/2021 10:58, Timo Walther wrote: > +1 (binding) > > I went through all commits one more time and could not spot anything > that would block a release. > > Thanks Chesnay! > > Timo > > On 15.07.21 09:02, Chesnay Schepler wrote: >> Hi everyone, >> Please review and vote on the release candidate #1 for the version >> 14.0, as follows: >> [ ] +1, Approve the release >> [ ] -1, Do not approve the release (please provide specific comments) >> >> >> The complete staging area is available for your review, which includes: >> * JIRA release notes [1], >> * the official Apache source release to be deployed to >> dist.apache.org [2], which are signed with the key with fingerprint >> C2EED7B111D464BA [3], >> * all artifacts to be deployed to the Maven Central Repository [4], >> * source code tag [5], >> * website pull request listing the new release [6]. >> >> The vote will be open for at least 72 hours. It is adopted by >> majority approval, with at least 3 PMC affirmative votes. >> >> Thanks, >> Chesnay >> >> [1] >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350408 >> >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-14.0-rc1/ >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS >> [4] >> https://repository.apache.org/content/repositories/orgapacheflink-1435 >> [5] https://github.com/apache/flink-shaded/commits/release-14.0-rc1 >> [6] https://github.com/apache/flink-web/pull/458 >> > OpenPGP_signature Description: OpenPGP digital signature
Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values
+1 to this change! When I was working on the reactive mode blog post [1] I also ran into this issue, leading to a poor "out of the box" experience when scaling down. For my experiments, I've chosen a timeout of 8 seconds, and the cluster has been running for 76 days (so far) on Kubernetes. I also consider this change somewhat low-risk, because we can provide a quick fix for people running into problems. [1]https://flink.apache.org/2021/05/06/reactive-mode.html On Fri, Jul 16, 2021 at 7:05 PM Till Rohrmann wrote: > Hi everyone, > > Since Flink 1.5 we have the same heartbeat timeout and interval default > values that are defined as heartbeat.timeout: 50s and heartbeat.interval: > 10s. These values were mainly chosen to compensate for lengthy GC pauses > and blocking operations that were executed in the main threads of Flink's > components. Since then, there were quite some advancements wrt the JVM's > GCs and we also got rid of a lot of blocking calls that were executed in > the main thread. Moreover, a long heartbeat.timeout causes long recovery > times in case of a TaskManager loss because the system can only properly > recover after the dead TaskManager has been removed from the scheduler. > Hence, I wanted to propose to change the timeout and interval to: > > heartbeat.timeout: 15s > heartbeat.interval: 3s > > Since there is no perfect solution that fits all use cases, I would really > like to hear from you what you think about it and how you configure these > heartbeat options. Based on your experience we might actually come up with > better default values that allow us to be resilient but also to detect > failed components fast. FLIP-185 can be found here [1]. > > [1] https://cwiki.apache.org/confluence/x/GAoBCw > > Cheers, > Till >
Re: [DISCUSS] FLIP-173: Support DAG of algorithms (Flink ML)
Hi Becket, Thank you for the detailed reply! My understanding of your comments is that most of option-1 looks good except its change of the Transformer semantics. Please see my reply inline. On Tue, Jul 20, 2021 at 11:43 AM Becket Qin wrote: > Hi Dong, Zhipeng and Fan, > > Thanks for the detailed proposals. It is quite a lot of reading! Given that > we are introducing a lot of stuff here, I find that it might be easier for > people to discuss if we can list the fundamental differences first. From > what I understand, the very fundamental difference between the two > proposals is following: > > * In order to support graph structure, do we extend Transformer/Estimator, > or do we introduce a new set of API? * > > Proposal 1 tries to keep one set of API, which is based on > Transformer/Estimator/Pipeline. More specifically, it does the following: > - Make Transformer and Estimator multi-input and multi-output (MIMO). > - Introduce a Graph/GraphModel as counter parts of > Pipeline/PipelineModel. > > Proposal 2 leaves the existing Transformer/Estimator/Pipeline as is. > Instead, it introduces AlgoOperators to support the graph structure. The > AlgoOperators are general-purpose graph nodes supporting MIMO. They are > independent of Pipeline / Transformer / Estimator. > > > My two cents: > > I think it is a big advantage to have a single set of API rather than two > independent sets of API, if possible. But I would suggest we change the > current proposal 1 a little bit, by learning from proposal 2. > > What I like about proposal 1: > 1. A single set of API, symmetric in Graph/GraphModel and > Pipeline/PipelineModel. > 2. Keeping most of the benefits from Transformer/Estimator, including the > fit-then-transform relation and save/load capability. > > However, proposal 1 also introduced some changes that I am not sure about: > > 1. The most controversial part of proposal 1 is whether we should extend > the Transformer/Estimator/Pipeline? In fact, different projects have > slightly different designs for Transformer/Estimator/Pipeline. So I think > it is OK to extend it. However, there are some commonly recognized core > semantics that we ideally want to keep. To me these core semantics are: > 1. Transformer is a Data -> Data conversion, Estimator deals with Data -> > Model conversion. > 2. Estimator.fit() gives a Transformer, and users can just call > Transformer.transform() to perform inference. > To me, as long as these core semantics are kept, extension to the API seems > acceptable. > > Proposal 1 extends the semantic of Transformer from Data -> Data conversion > to generic Table -> Table conversion, and claims it is equivalent to > I am a bit confused by this description of the semantic change. By "from Data -> Data conversion to generic Table -> Table", do you mean "Table != Data"? I am hoping we can clearly define the semantics of Transformer and agree on its semantics definition within the community. Then we can see how option-1 changes its semantics and update option-1 as appropriate. "AlgoOperator" in proposal 2 as a general-purpose graph node. It does > change the first semantic. That said, this might just be a naming problem, > though. One possible solution to this problem is having a new subclass of > Stage without strong conventional semantics, e.g. "AlgoOp" if we borrow the > name from proposal 2, and let Transformer extend it. Just like a > Assuming that option-1 does change the meaning of Transformer (this point depends on the answer to the previous question), I am happy to update option-1 to address the concern. Here is how I would update option-1: - Add an interface named "MLOperator". This interface will be the same as the existing Transformer interface in option-1 except its name. - Make Transformer a subclass of MLOperator. Because Transformer has exactly the same API as MLOperator, no extra method is defined in Transformer. - Every other API/class in option-1 which deals with Transformer will now deal with MLOperator. This change proposed above basically renamed Transformer to address the semantic/naming concern. Does this sound good? > PipelineModel is a more specific Transformer, a Transformer would be a more > specific "AlgoOp". If we do that, the processing logic that people don't > feel comfortable to be a Transformer can just be put into an "AlgoOp" and > thus can still be added to a Pipeline / Graph. This borrows the advantage > of proposal 2. In another word, this essentially makes the "AlgoOp" > equivalent of "AlgoOperator" in proposal 2, but allows it to be added to > the Graph and Pipeline if people want to. > > This also gives my thoughts regarding the concern that making the > Transformer/Estimator to MIMO would break the convention of single input > single output (SISO) Transformer/Estimator. Since this does not change the > core semantic of Transformer/Estimator, it sounds an intuitive extension to > me. > > 2. Another semantic related case brought up
[jira] [Created] (FLINK-23437) Connection leak in XaFacadePoolingImpl
Maciej Bryński created FLINK-23437: -- Summary: Connection leak in XaFacadePoolingImpl Key: FLINK-23437 URL: https://issues.apache.org/jira/browse/FLINK-23437 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.13.1 Reporter: Maciej Bryński Hi, I'm using JDBC XA connector to put data into Oracle database. I'm facing issue with too many concurrent connection to database. I changed this method to return XaFacadeImpl instead of XaFacadePoolingImpl and problem was solved. {code:java} static XaFacade fromXaDataSourceSupplier( Supplier dataSourceSupplier, Integer timeoutSec) { return new XaFacadePoolingImpl(() -> new XaFacadeImpl(dataSourceSupplier, timeoutSec)); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-173: Support DAG of algorithms (Flink ML)
Hi Becket, Thanks for the review! I totally agree that it would be easier for people to discuss if we can list the fundamental difference between these two proposals. (So I want to make the discussion even shorter) In my opinion, the fundamental difference between proposal-1 and proposal-2 is how they support the multi-input multi-output (MIMO) machine learning algorithms. Proposal-1 supports MIMO by extending Transformer/Estimator/Pipeline to take multiple inputs and output multiple outputs. Proposal-2 does not change the definition of Transformer/Estimator/Pipeline. Rather, to support MIMO it comes up with a new abstraction --- AlgoOperator, which is essentially an abstraction for machine learning functions. That is, proposal-2 employs well-recognized Transformer/Estimator/Pipeline to support single-input single-output (SISO) machine learning algorithms and AlgoOperator to support MIMO. In my opinion, the benefit of proposal-1 is that there is only one set of API and it is clean. However, it breaks the user habits (SISO for Transformer/Estimator/Pipeline). Users have to think more than before when using the new Transformer/Estimator/Pipeline. [1] The benefit of proposal-2 is that it does not change anything of the well-recognized Transformer/Estimator/Pipeline and existing users (e.g., Spark MLlib users) would be happy. However, as you mentioned, proposal-2 introduces a new abstraction (AlgoOperator), which may increase the burden for understanding. [1] https://docs.google.com/document/d/1L3aI9LjkcUPoM52liEY6uFktMnFMNFQ6kXAjnz_11do/edit#heading=h.c2qr9r64btd9 Thanks, Zhipeng Zhang Becket Qin 于2021年7月20日周二 上午11:42写道: > Hi Dong, Zhipeng and Fan, > > Thanks for the detailed proposals. It is quite a lot of reading! Given > that we are introducing a lot of stuff here, I find that it might be easier > for people to discuss if we can list the fundamental differences first. > From what I understand, the very fundamental difference between the two > proposals is following: > > * In order to support graph structure, do we extend Transformer/Estimator, > or do we introduce a new set of API? * > > Proposal 1 tries to keep one set of API, which is based on > Transformer/Estimator/Pipeline. More specifically, it does the following: > - Make Transformer and Estimator multi-input and multi-output (MIMO). > - Introduce a Graph/GraphModel as counter parts of > Pipeline/PipelineModel. > > Proposal 2 leaves the existing Transformer/Estimator/Pipeline as is. > Instead, it introduces AlgoOperators to support the graph structure. The > AlgoOperators are general-purpose graph nodes supporting MIMO. They are > independent of Pipeline / Transformer / Estimator. > > > My two cents: > > I think it is a big advantage to have a single set of API rather than two > independent sets of API, if possible. But I would suggest we change the > current proposal 1 a little bit, by learning from proposal 2. > > What I like about proposal 1: > 1. A single set of API, symmetric in Graph/GraphModel and > Pipeline/PipelineModel. > 2. Keeping most of the benefits from Transformer/Estimator, including the > fit-then-transform relation and save/load capability. > > However, proposal 1 also introduced some changes that I am not sure about: > > 1. The most controversial part of proposal 1 is whether we should extend > the Transformer/Estimator/Pipeline? In fact, different projects have > slightly different designs for Transformer/Estimator/Pipeline. So I think > it is OK to extend it. However, there are some commonly recognized core > semantics that we ideally want to keep. To me these core semantics are: > 1. Transformer is a Data -> Data conversion, Estimator deals with Data > -> Model conversion. > 2. Estimator.fit() gives a Transformer, and users can just call > Transformer.transform() to perform inference. > To me, as long as these core semantics are kept, extension to the API > seems acceptable. > > Proposal 1 extends the semantic of Transformer from Data -> Data > conversion to generic Table -> Table conversion, and claims it is > equivalent to "AlgoOperator" in proposal 2 as a general-purpose graph node. > It does change the first semantic. That said, this might just be a naming > problem, though. One possible solution to this problem is having a new > subclass of Stage without strong conventional semantics, e.g. "AlgoOp" if > we borrow the name from proposal 2, and let Transformer extend it. Just > like a PipelineModel is a more specific Transformer, a Transformer would be > a more specific "AlgoOp". If we do that, the processing logic that people > don't feel comfortable to be a Transformer can just be put into an "AlgoOp" > and thus can still be added to a Pipeline / Graph. This borrows the > advantage of proposal 2. In another word, this essentially makes the > "AlgoOp" equivalent of "AlgoOperator" in proposal 2, but allows it to be > added to the Graph and Pipeline if people want to. > > This also gives my thoughts
Re: [DISCUSS] FLIP-177: Extend Sink API
Hi, Avrid Thank you Avrid for perfecting Sink through this FLIP. I have two little questions 1. What do you think of us directly providing an interface as follows? In this way, there may be no need to expose the Mailbox to the user. We can implement an `AsyncSinkWriterOperator` to control the length of the queue. If it is too long, do not call SinkWriter::write. public interface AsyncSinkWriter extends SinkWriter>, CommT, WriterStateT> { please ignore the name of Tuple2 and XXXFuture at first. int getQueueLimit(); } 2. Another question is: If users don't care about exactly once and the unification of stream and batch, how about letting users use `AsyncFunction` directly? I don’t have an answer either. I want to hear your suggestions. Best, Guowei On Mon, Jul 19, 2021 at 3:38 PM Arvid Heise wrote: > Dear devs, > > today I'd like to start the discussion on the Sink API. I have drafted a > FLIP [1] with an accompanying PR [2]. > > This FLIP is a bit special as it's actually a few smaller Amend-FLIPs in > one. In this discussion, we should decide on the scope and cut out too > invasive steps if we can't reach an agreement. > > Step 1 is to add a few more pieces of information to context objects. > That's non-breaking and needed for the async communication pattern in > FLIP-171 [3]. While we need to add a new Public API (MailboxExecutor), I > think that this should entail the least discussions. > > Step 2 is to also offer the same context information to committers. Here we > can offer some compatibility methods to not break existing sinks. The main > use case would be some async exactly-once sink but I'm not sure if we would > use async communication patterns at all here (or simply wait for all async > requests to finish in a sync way). It may also help with async cleanup > tasks though. > > While drafting Step 2, I noticed the big entanglement of the current API. > To figure out if there is a committer during the stream graph creation, we > actually need to create a committer which can have unforeseen consequences. > Thus, I spiked if we can disentangle the interface and have separate > interfaces for the different use cases. The resulting step 3 would be a > completely breaking change and thus is probably controversial. However, I'd > also see the disentanglement as a way to prepare to make Sinks more > expressive (write and commit coordinator) without completely overloading > the main interface. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-177%3A+Extend+Sink+API > [2] https://github.com/apache/flink/pull/16399 > [3] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink >
[jira] [Created] (FLINK-23436) When implements the LookupFunction In SQL that indirect extends (Async)TableFunction, throw exception
吴彦祖 created FLINK-23436: --- Summary: When implements the LookupFunction In SQL that indirect extends (Async)TableFunction, throw exception Key: FLINK-23436 URL: https://issues.apache.org/jira/browse/FLINK-23436 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.0 Reporter: 吴彦祖 Attachments: image-2021-07-20-15-19-48-794.png, image-2021-07-20-15-21-59-569.png When I implement the inner LookupTableFunctions, I use a abstract class to hold the common fields and common implements, like mini-batch, local cache, etc. the abstract class extends from (Async)TableFunction in Flink 1.13, and all the inner LookupTableFunctions extends from this class. !image-2021-07-20-15-19-48-794.png! I got the following exception when use lookup join in SQL: {noformat} Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not determine a type inference for lookup function 'default_catalog.default_database.xxx_lookup'. Lookup functions support regular type inference. However, for convenience, the output class can simply be a Row or RowData class in which case the input and output types are derived from the table's schema with default conversion.Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not determine a type inference for lookup function 'default_catalog.default_database.xxx_lookup'. Lookup functions support regular type inference. However, for convenience, the output class can simply be a Row or RowData class in which case the input and output types are derived from the table's schema with default conversion. at org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.createLookupTypeInference(LookupJoinCodeGenerator.scala:270) at org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateLookupFunction(LookupJoinCodeGenerator.scala:166) at org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator$.generateSyncLookupFunction(LookupJoinCodeGenerator.scala:87) at org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator.generateSyncLookupFunction(LookupJoinCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createSyncLookupJoin(CommonExecLookupJoin.java:440) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:258) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:75) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate.translateToPlanInternal(StreamExecGroupWindowAggregate.java:192) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:75) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.java:146) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at
Re: [DISCUSS] FLIP-179: Expose Standardized Operator Metrics
Would it be easier to understand if the method would accept a Supplier instead? On 20/07/2021 05:36, Becket Qin wrote: In that case, do we still need the metric here? It seems we are creating a "global variable" which users may potentially use. I am wondering how much additional convenience it provides because it seems easy for people to simply pass the fetch time by themselves if they have decided to not use SourceReaderBase. Also, it looks like we do not have an API pattern that lets users get the value of a metric and derive another metric. So I think it is easier for people to understand if LastFetchTimeGauge() is just an independent metric by itself, instead of being a part of the eventTimeFetchLag computation.
[jira] [Created] (FLINK-23435) Improve the exception message when a query has lateral table join without 'AS' which return type is simple type
godfrey he created FLINK-23435: -- Summary: Improve the exception message when a query has lateral table join without 'AS' which return type is simple type Key: FLINK-23435 URL: https://issues.apache.org/jira/browse/FLINK-23435 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.13.0, 1.14.0 Reporter: godfrey he Fix For: 1.14.0, 1.13.3 when run the following query: {code:sql} SELECT * FROM MyTable, LATERAL TABLE(STRING_SPLIT(c, '|')) {code} we will get the following exception: {code:java} org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:155) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:195) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:576) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:260) at com.ververica.platform.sql.environment.VvpSqlParser.lambda$parse$5(VvpSqlParser.java:127) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at com.ververica.platform.sql.environment.VvpSqlParser.parse(VvpSqlParser.java:126) at com.ververica.platform.sql.service.SqlScriptValidationService$SqlScriptValidator.get(SqlScriptValidationService.java:156) at com.ververica.platform.sql.service.SqlScriptValidationService$SqlScriptValidator.get(SqlScriptValidationService.java:141) at com.ververica.platform.sql.classloader.ClassLoaderWrapper.execute(ClassLoaderWrapper.java:12) at com.ververica.platform.sql.service.SqlScriptValidationService.validate(SqlScriptValidationService.java:122) at com.ververica.platform.sql.service.SqlScriptValidationService.validate(SqlScriptValidationService.java:80) at com.ververica.platform.sql.service.SqlScriptValidationService.validate(SqlScriptValidationService.java:64) at com.ververica.platform.sql.service.SqlScriptValidationService.validate(SqlScriptValidationService.java:58) at com.ververica.platform.sql.service.SqlService.validateStatement(SqlService.java:465) at com.ververica.platform.sql.v1beta1.SqlServiceGrpc$MethodHandlers.invoke(SqlServiceGrpc.java:1194) at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:984) Caused by: java.lang.NullPointerException at org.apache.calcite.rel.type.RelDataTypeFactoryImpl.addFields(RelDataTypeFactoryImpl.java:424) at org.apache.calcite.rel.type.RelDataTypeFactoryImpl.getFieldList(RelDataTypeFactoryImpl.java:389) at org.apache.calcite.rel.type.RelDataTypeFactoryImpl.createJoinType(RelDataTypeFactoryImpl.java:133) at org.apache.calcite.sql.validate.JoinNamespace.validateImpl(JoinNamespace.java:59) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3211) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:150) ... 24 more {code} The exception message is too obscure to know what the specific error is. -- This message was sent by Atlassian Jira (v8.3.4#803005)