Re: [Discuss] Planning Flink 1.14
Hi all, Thanks Xintong for bringing this up. I would like to share some experience of the usage of Flink in our company (ByteDance). 1. We started building our SQL platform in mid 2019, using v1.9 blink planner, and it's amazing. Also we added many internal features which is still missing in this version, including DDL/Computed Column/ a lot of internal formats and connectors, and some other planner changes. 2. At early 2020, we plan to upgrade to v1.10. Before we finished cherry-picking internal commits to v1.10, we found that v1.11 is going to be released soon. Hence we decided to upgrade to v1.11. Till late 2020, we almost finished internal feature check-picking work. (It takes us so long because we still adding new features to our online version v1.9 at the same time) 3. Now Although we tried a lot of work to reduce the overhead for our users to upgrading from v1.9 to v1.11, this process is still slow, because: a) All the connectors/formats properties changed (although we have a tool for them to upgrade in one click, they still have a lot of learning cost) b) The checkpoint cannot be upgraded 4. Future We have 5000+ online SQL jobs and hundreds of commits, we do not plan to do an upgrade in short term. However v1.11 still lacks a lot of features, for example: a) new UDF type inference does not support aggregate function b) FLIP-27 new source interface cannot be used in SQL We may need to to a lot of cherry-picking to our v1.11 So, from our point, longer release circle and more fully finished features may benefit us a lot. JING ZHANG 于2021年6月4日周五 下午6:02写道: > Hi all, > > @Xintong Song > Thanks for reminding me, I would contact Jark to update the wiki page. > > Besides, I'd like to provide more inputs by sharing our experience about > upgrading Internal version of Flink. > > Flink has been widely used in the production environment since 2018 in our > company. Our internal version is far behind the latest stable version of > the community by about 1 year. We upgraded the internal Flink version to > 1.10 version in March last year, and we plan to upgrade directly to 1.13 > next month (missed 1.11 and 1.12 versions). We wish to use the latest > version as soon as possible. However, in fact we follow up with the > community's latest stable release version almost once a year because > upgrading to a new version is a time-consuming process. > > I list detailed works as follows. > > a. Before release new internal version > 1) Required: Cherrypick internal features to the new Flink branch. A few > features need to be redeveloped based on the new branch code base. > BTW, The cost would be more and more heavy since we maintain more and > more internal features in our internal version. > 2) Optional: Some internal connectors need to adapt to the new API > 3) Required: Surrounding products need to updated based on the new API, for > example, Internal Flink SQL WEB development platform > 4) Required: Regression tests > > b. After release, encourage users to upgrade existing jobs (Thousands of > jobs) to the new version, User need some time to do : > 1) Repackage jar for dataStream job > 2) For critical jobs, users need to run jobs at the two versions at the > same time for a while. Migrated to a new job only after comparing the > data carefully. > 3) Pure ETL SQL jobs are easy to bump up. But other Flink SQL jobs with > stateful operators need extra efforts because Flink SQL Job does not > support state compatibility yet. > > Best regards, > JING ZHANG > > Prasanna kumar 于2021年6月4日周五 下午2:27写道: > > > Hi all, > > > > We are using Flink for our eventing system. Overall we are very happy > with > > the tech, documentation and community support and quick replies in mails. > > > > My last 1 year experience with versions. > > > > We were working on 1.10 initially during our research phase then we > > stabilised with 1.11 as we moved on but by the time we are about to get > > into production 1.12 was released. As with all software and products, > > there were bugs reported. So we waited till 1.12.2 was released and then > > upgraded. Within a month of us doing it 1.13 got released. > > > > But by past experience , we waited till at least a couple of minor > > versions(fixing bugs) get released before we move onto a newer version. > > The development happens at a rapid/good phase in flink (which is good in > > terms of features) but adoption and moving the production code to newer > > version 3/4 times a year is an onerous effort. For example , the memory > > model was changed in one of the releases (there is a good documentation) > . > > But as a production user to adopt the newer version, at least a month of > > testing is required with a huge scale environment. We also do not want to > > be behind more than 2 versions at any point of time. > > > > I Personally feel 2 major releases a year or at max a release once 5 > months > > is good. > > > > Thanks > > Prasanna. > > > > On Fri, Jun 4, 2021 at 9:38 AM
[jira] [Created] (FLINK-22895) Table common document misspelled
sujun created FLINK-22895: - Summary: Table common document misspelled Key: FLINK-22895 URL: https://issues.apache.org/jira/browse/FLINK-22895 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.14.0 Reporter: sujun Attachments: image-2021-06-07-11-04-30-998.png The variable is misspelled, the setting should be changed to settings. The document url is: [https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#create-a-tableenvironment] !image-2021-06-07-11-04-30-998.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Add control mode for flink
Thank you for the reply. I have checked the post you mentioned. The dynamic config may be useful sometimes. But it is hard to keep data consistent in flink, for example, what if the dynamic config will take effect when failover. Since dynamic config is a desire for users, maybe flink can support it in some way. For the control mode, dynamic config is just one of the control modes. In the google doc, I have list some other cases. For example, control events are generated in operators or external services. Besides user's dynamic config, flink system can support some common dynamic configuration, like qps limit, checkpoint control and so on. It needs good design to handle the control mode structure. Based on that, other control features can be added easily later, like changing log level when job is running. In the end, flink will not just process data, but also interact with users to receive control events like a service. Steven Wu 于2021年6月4日周五 下午11:11写道: > I am not sure if we should solve this problem in Flink. This is more like > a dynamic config problem that probably should be solved by some > configuration framework. Here is one post from google search: > https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a > > On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 wrote: > >> Hi everyone, >> >> Flink jobs are always long-running. When the job is running, users >> may want to control the job but not stop it. The control reasons can be >> different as following: >> >>1. >> >>Change data processing’ logic, such as filter condition. >>2. >> >>Send trigger events to make the progress forward. >>3. >> >>Define some tools to degrade the job, such as limit input qps, >>sampling data. >>4. >> >>Change log level to debug current problem. >> >> The common way to do this is to stop the job, do modifications and >> start the job. It may take a long time to recover. In some situations, >> stopping jobs is intolerable, for example, the job is related to money or >> important activities.So we need some technologies to control the running >> job without stopping the job. >> >> >> We propose to add control mode for flink. A control mode based on the >> restful interface is first introduced. It works by these steps: >> >> >>1. The user can predefine some logic which supports config control, >>such as filter condition. >>2. Run the job. >>3. If the user wants to change the job's running logic, just send a >>restful request with the responding config. >> >> Other control modes will also be considered in the future. More >> introduction can refer to the doc >> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >> . If the community likes the proposal, more discussion is needed and a more >> detailed design will be given later. Any suggestions and ideas are welcome. >> >>
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink
Thanks Yingjie for the great effort! This is really helpful to Flink Batch users! Best, Jingsong On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao wrote: > Hi devs & users, > > The FLIP-148[1] has been released with Flink 1.13 and the final > implementation has some differences compared with the initial proposal in > the FLIP document. To avoid potential misunderstandings, I have updated the > FLIP document[1] accordingly and I also drafted another document[2] which > contains more implementation details. FYI. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > [2] > https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing > > Best, > Yingjie > > Yingjie Cao 于2020年10月15日周四 上午11:02写道: > >> Hi devs, >> >> Currently, Flink adopts a hash-style blocking shuffle implementation >> which writes data sent to different reducer tasks into separate files >> concurrently. Compared to sort-merge based approach writes those data >> together into a single file and merges those small files into bigger ones, >> hash-based approach has several weak points when it comes to running large >> scale batch jobs: >> >>1. *Stability*: For high parallelism (tens of thousands) batch job, >>current hash-based blocking shuffle implementation writes too many files >>concurrently which gives high pressure to the file system, for example, >>maintenance of too many file metas, exhaustion of inodes or file >>descriptors. All of these can be potential stability issues. Sort-Merge >>based blocking shuffle don’t have the problem because for one result >>partition, only one file is written at the same time. >>2. *Performance*: Large amounts of small shuffle files and random IO >>can influence shuffle performance a lot especially for hdd (for ssd, >>sequential read is also important because of read ahead and cache). For >>batch jobs processing massive data, small amount of data per subpartition >>is common because of high parallelism. Besides, data skew is another cause >>of small subpartition files. By merging data of all subpartitions together >>in one file, more sequential read can be achieved. >>3. *Resource*: For current hash-based implementation, each >>subpartition needs at least one buffer. For large scale batch shuffles, >> the >>memory consumption can be huge. For example, we need at least 320M network >>memory per result partition if parallelism is set to 1 and because of >>the huge network consumption, it is hard to config the network memory for >>large scale batch job and sometimes parallelism can not be increased just >>because of insufficient network memory which leads to bad user >> experience. >> >> To improve Flink’s capability of running large scale batch jobs, we would >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> feedback is appreciated. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> >> Best, >> Yingjie >> > -- Best, Jingsong Lee
Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink
Hi devs & users, The FLIP-148[1] has been released with Flink 1.13 and the final implementation has some differences compared with the initial proposal in the FLIP document. To avoid potential misunderstandings, I have updated the FLIP document[1] accordingly and I also drafted another document[2] which contains more implementation details. FYI. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink [2] https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing Best, Yingjie Yingjie Cao 于2020年10月15日周四 上午11:02写道: > Hi devs, > > Currently, Flink adopts a hash-style blocking shuffle implementation which > writes data sent to different reducer tasks into separate files > concurrently. Compared to sort-merge based approach writes those data > together into a single file and merges those small files into bigger ones, > hash-based approach has several weak points when it comes to running large > scale batch jobs: > >1. *Stability*: For high parallelism (tens of thousands) batch job, >current hash-based blocking shuffle implementation writes too many files >concurrently which gives high pressure to the file system, for example, >maintenance of too many file metas, exhaustion of inodes or file >descriptors. All of these can be potential stability issues. Sort-Merge >based blocking shuffle don’t have the problem because for one result >partition, only one file is written at the same time. >2. *Performance*: Large amounts of small shuffle files and random IO >can influence shuffle performance a lot especially for hdd (for ssd, >sequential read is also important because of read ahead and cache). For >batch jobs processing massive data, small amount of data per subpartition >is common because of high parallelism. Besides, data skew is another cause >of small subpartition files. By merging data of all subpartitions together >in one file, more sequential read can be achieved. >3. *Resource*: For current hash-based implementation, each >subpartition needs at least one buffer. For large scale batch shuffles, the >memory consumption can be huge. For example, we need at least 320M network >memory per result partition if parallelism is set to 1 and because of >the huge network consumption, it is hard to config the network memory for >large scale batch job and sometimes parallelism can not be increased just >because of insufficient network memory which leads to bad user experience. > > To improve Flink’s capability of running large scale batch jobs, we would > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > feedback is appreciated. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > Best, > Yingjie >
[jira] [Created] (FLINK-22894) Window Top-N should allow n=1
David Anderson created FLINK-22894: -- Summary: Window Top-N should allow n=1 Key: FLINK-22894 URL: https://issues.apache.org/jira/browse/FLINK-22894 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.13.1 Reporter: David Anderson I tried to reimplement the Hourly Tips exercise from the DataStream training using Flink SQL. The objective of this exercise is to find the one taxi driver who earned the most in tips during each hour, and report that driver's driverId and the sum of their tips. This can be expressed as a window top-n query, where n=1, as in {{FROM (}} {{ SELECT *, ROW_NUMBER() OVER }}{{(PARTITION BY window_start, window_end ORDER BY sumOfTips DESC) as rownum}} {{ FROM ( }} {{ SELECT driverId, window_start, window_end, sum(tip) as sumOfTips}} {{ FROM TABLE( }} {{ TUMBLE(TABLE fares, DESCRIPTOR(startTime), INTERVAL '1' HOUR))}} {{ GROUP BY driverId, window_start, window_end}} {{ )}} {{) WHERE rownum = 1;}} This fails because the {{WindowRankOperatorBuilder}} insists on {{rankEnd > 1. }}So, in other words, while it is possible to report the top 2 drivers, or the driver in 2nd place, it's not possible to report only the top driver. This appears to be an off-by-one error in the range checking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22893) ResumeCheckpointManuallyITCase hangs on azure
Dawid Wysakowicz created FLINK-22893: Summary: ResumeCheckpointManuallyITCase hangs on azure Key: FLINK-22893 URL: https://issues.apache.org/jira/browse/FLINK-22893 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Dawid Wysakowicz https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc=4382 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22892) Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test fails
Dawid Wysakowicz created FLINK-22892: Summary: Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test fails Key: FLINK-22892 URL: https://issues.apache.org/jira/browse/FLINK-22892 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Dawid Wysakowicz https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=739e6eac-8312-5d31-d437-294c4d26fced=a68b8d89-50e9-5977-4500-f4fde4f57f9b=921 {code} Jun 05 20:47:11 Running 'Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test' Jun 05 20:47:11 == Jun 05 20:47:11 TEST_DATA_DIR: /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11283890527 Jun 05 20:47:11 Flink dist directory: /home/vsts/work/1/s/flink-dist/target/flink-1.14-SNAPSHOT-bin/flink-1.14-SNAPSHOT Jun 05 20:47:11 Starting cluster. Jun 05 20:47:12 Starting standalonesession daemon on host fv-az83-351. Jun 05 20:47:13 Starting taskexecutor daemon on host fv-az83-351. Jun 05 20:47:13 Waiting for Dispatcher REST endpoint to come up... Jun 05 20:47:14 Waiting for Dispatcher REST endpoint to come up... Jun 05 20:47:16 Waiting for Dispatcher REST endpoint to come up... Jun 05 20:47:17 Waiting for Dispatcher REST endpoint to come up... Jun 05 20:47:18 Dispatcher REST endpoint is up. Jun 05 20:47:18 Running externalized checkpoints test, with ORIGINAL_DOP=4 NEW_DOP=2 and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ... Jun 05 20:47:25 Job (b84a167a07b862a9dbbcdc6a5969f75c) is running. Jun 05 20:47:25 Waiting for job (b84a167a07b862a9dbbcdc6a5969f75c) to have at least 1 completed checkpoints ... Jun 05 20:57:30 A timeout occurred waiting for job (b84a167a07b862a9dbbcdc6a5969f75c) to have at least 1 completed checkpoints . Jun 05 20:57:30 Stopping job timeout watchdog (with pid=23016) Jun 05 20:57:30 [FAIL] Test script contains errors. Jun 05 20:57:30 Checking of logs skipped. Jun 05 20:57:30 Jun 05 20:57:30 [FAIL] 'Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test' failed after 10 minutes and 19 seconds! Test exited with exit code 1 Jun 05 20:57:30 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
Dawid Wysakowicz created FLINK-22891: Summary: FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure Key: FLINK-22891 URL: https://issues.apache.org/jira/browse/FLINK-22891 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Dawid Wysakowicz https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=8660 {code} Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.24 s <<< FAILURE! - in org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase Jun 05 21:16:00 [ERROR] testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase) Time elapsed: 5.015 s <<< ERROR! Jun 05 21:16:00 java.util.concurrent.TimeoutException Jun 05 21:16:00 at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) Jun 05 21:16:00 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308) Jun 05 21:16:00 at org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262) Jun 05 21:16:00 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Jun 05 21:16:00 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Jun 05 21:16:00 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Jun 05 21:16:00 at java.lang.reflect.Method.invoke(Method.java:498) Jun 05 21:16:00 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Jun 05 21:16:00 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Jun 05 21:16:00 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Jun 05 21:16:00 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Jun 05 21:16:00 at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) Jun 05 21:16:00 at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) Jun 05 21:16:00 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jun 05 21:16:00 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Jun 05 21:16:00 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Jun 05 21:16:00 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Jun 05 21:16:00 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Jun 05 21:16:00 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Jun 05 21:16:00 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Jun 05 21:16:00 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Jun 05 21:16:00 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Jun 05 21:16:00 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Jun 05 21:16:00 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jun 05 21:16:00 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Jun 05 21:16:00 at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) Jun 05 21:16:00 at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) Jun 05 21:16:00 at
[jira] [Created] (FLINK-22890) Few tests fail in HiveTableSinkITCase
Dawid Wysakowicz created FLINK-22890: Summary: Few tests fail in HiveTableSinkITCase Key: FLINK-22890 URL: https://issues.apache.org/jira/browse/FLINK-22890 Project: Flink Issue Type: Bug Components: Connectors / Hive, Table SQL / Ecosystem Affects Versions: 1.13.1 Reporter: Dawid Wysakowicz https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18692=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=420bd9ec-164e-562e-8947-0dacde3cec91=23189 {code} Jun 05 01:22:13 [ERROR] Errors: Jun 05 01:22:13 [ERROR] HiveTableSinkITCase.testBatchAppend:138 » Validation Could not execute CREATE ... Jun 05 01:22:13 [ERROR] HiveTableSinkITCase.testDefaultSerPartStreamingWrite:156->testStreamingWrite:494 » Validation Jun 05 01:22:13 [ERROR] HiveTableSinkITCase.testHiveTableSinkWithParallelismInStreaming:100->testHiveTableSinkWithParallelismBase:108 » Validation Jun 05 01:22:13 [ERROR] HiveTableSinkITCase.testPartStreamingMrWrite:179->testStreamingWrite:423 » Validation Jun 05 01:22:13 [ERROR] HiveTableSinkITCase.testStreamingSinkWithTimestampLtzWatermark:360->fetchRows:384 » TestTimedOut {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22889) JdbcExactlyOnceSinkE2eTest hangs on azure
Dawid Wysakowicz created FLINK-22889: Summary: JdbcExactlyOnceSinkE2eTest hangs on azure Key: FLINK-22889 URL: https://issues.apache.org/jira/browse/FLINK-22889 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.14.0 Reporter: Dawid Wysakowicz Fix For: 1.14.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18690=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51=16658 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS]FLIP-150: Introduce Hybrid Source
> Converter function relies on the specific enumerator capabilities to set the new start position (e.g. fileSourceEnumerator.getEndTimestamp() and kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) I guess the premise is that a converter is for a specific tuple of (upstream source, downstream source) . We don't have to define generic EndtStateT and SwitchableEnumerator interfaces. That should work. The benefit of defining EndtStateT and SwitchableEnumerator interfaces is probably promoting uniformity across sources that support hybrid/switchable source. On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise wrote: > Hi Steven, > > Thank you for the thorough review of the PR and for bringing this back > to the mailing list. > > All, > > I updated the FLIP-150 page to highlight aspects in which the PR > deviates from the original proposal [1]. The goal would be to update > the FLIP soon and bring it to a vote, as previously suggested offline > by Nicholas. > > A few minor issues in the PR are outstanding and I'm working on test > coverage for the recovery behavior, which should be completed soon. > > The dynamic position transfer needs to be concluded before we can move > forward however. > > There have been various ideas, including the special > "SwitchableEnumerator" interface, using enumerator checkpoint state or > an enumerator interface extension to extract the end state. > > One goal in the FLIP is to "Reuse the existing Source connectors built > with FLIP-27 without any change." and I think it is important to honor > that goal given that fixed start positions do not require interface > changes. > > Based on the feedback the following might be a good solution for > runtime position transfer: > > * User supplies the optional converter function (not applicable for > fixed positions). > * Instead of relying on the enumerator checkpoint state [2], the > converter function will be supplied with the current and next > enumerator (source.createEnumerator). > * Converter function relies on the specific enumerator capabilities to > set the new start position (e.g. > fileSourceEnumerator.getEndTimestamp() and > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > * HybridSourceSplitEnumerator starts new underlying enumerator > > With this approach, there is no need to augment FLIP-27 interfaces and > custom source capabilities are easier to integrate. Removing the > mandate to rely on enumerator checkpoint state also avoids potential > upgrade/compatibility issues. > > Thoughts? > > Thanks, > Thomas > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation > [2] > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 > > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu wrote: > > > > discussed the PR with Thosmas offline. Thomas, please correct me if I > > missed anything. > > > > Right now, the PR differs from the FLIP-150 doc regarding the converter. > > * Current PR uses the enumerator checkpoint state type as the input for > the > > converter > > * FLIP-150 defines a new EndStateT interface. > > It seems that the FLIP-150 approach of EndStateT is more flexible, as > > transition EndStateT doesn't have to be included in the upstream source > > checkpoint state. > > > > Let's look at two use cases: > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 > pm, > > then Kafka source starts with initial position of 5 pm. In this case, > there > > is no need for converter or EndStateT since the starting time for Kafka > > source is known and fixed. > > 2) dynamic cutover time at 1 hour before now. This is useful when the > > bootstrap of historic data takes a long time (like days or weeks) and we > > don't know the exact time of cutover when a job is launched. Instead, we > > are instructing the file source to stop when it gets close to live data. > In > > this case, hybrid source construction will specify a relative time (now > - 1 > > hour), the EndStateT (of file source) will be resolved to an absolute > time > > for cutover. We probably don't need to include EndStateT (end timestamp) > as > > the file source checkpoint state. Hence, the separate EndStateT is > probably > > more desirable. > > > > We also discussed the converter for the Kafka source. Kafka source > supports > > different OffsetsInitializer impls (including > TimestampOffsetsInitializer). > > To support the dynamic cutover time (use case #2 above), we can plug in a > > SupplierTimestampOffsetInitializer, where the starting timestamp is not > set > > during source/job construction. Rather it is a supplier model where the > > starting timestamp value is set to the resolved absolute timestamp during > > switch. > > > > Thanks, > > Steven > > > > > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise wrote: > > > > > Hi Nicholas, > > > > > > Thanks for taking a look at
Re: [DISCUSS]FLIP-150: Introduce Hybrid Source
Hi Steven, Thank you for the thorough review of the PR and for bringing this back to the mailing list. All, I updated the FLIP-150 page to highlight aspects in which the PR deviates from the original proposal [1]. The goal would be to update the FLIP soon and bring it to a vote, as previously suggested offline by Nicholas. A few minor issues in the PR are outstanding and I'm working on test coverage for the recovery behavior, which should be completed soon. The dynamic position transfer needs to be concluded before we can move forward however. There have been various ideas, including the special "SwitchableEnumerator" interface, using enumerator checkpoint state or an enumerator interface extension to extract the end state. One goal in the FLIP is to "Reuse the existing Source connectors built with FLIP-27 without any change." and I think it is important to honor that goal given that fixed start positions do not require interface changes. Based on the feedback the following might be a good solution for runtime position transfer: * User supplies the optional converter function (not applicable for fixed positions). * Instead of relying on the enumerator checkpoint state [2], the converter function will be supplied with the current and next enumerator (source.createEnumerator). * Converter function relies on the specific enumerator capabilities to set the new start position (e.g. fileSourceEnumerator.getEndTimestamp() and kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) * HybridSourceSplitEnumerator starts new underlying enumerator With this approach, there is no need to augment FLIP-27 interfaces and custom source capabilities are easier to integrate. Removing the mandate to rely on enumerator checkpoint state also avoids potential upgrade/compatibility issues. Thoughts? Thanks, Thomas [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation [2] https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 On Tue, Jun 1, 2021 at 3:10 PM Steven Wu wrote: > > discussed the PR with Thosmas offline. Thomas, please correct me if I > missed anything. > > Right now, the PR differs from the FLIP-150 doc regarding the converter. > * Current PR uses the enumerator checkpoint state type as the input for the > converter > * FLIP-150 defines a new EndStateT interface. > It seems that the FLIP-150 approach of EndStateT is more flexible, as > transition EndStateT doesn't have to be included in the upstream source > checkpoint state. > > Let's look at two use cases: > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm, > then Kafka source starts with initial position of 5 pm. In this case, there > is no need for converter or EndStateT since the starting time for Kafka > source is known and fixed. > 2) dynamic cutover time at 1 hour before now. This is useful when the > bootstrap of historic data takes a long time (like days or weeks) and we > don't know the exact time of cutover when a job is launched. Instead, we > are instructing the file source to stop when it gets close to live data. In > this case, hybrid source construction will specify a relative time (now - 1 > hour), the EndStateT (of file source) will be resolved to an absolute time > for cutover. We probably don't need to include EndStateT (end timestamp) as > the file source checkpoint state. Hence, the separate EndStateT is probably > more desirable. > > We also discussed the converter for the Kafka source. Kafka source supports > different OffsetsInitializer impls (including TimestampOffsetsInitializer). > To support the dynamic cutover time (use case #2 above), we can plug in a > SupplierTimestampOffsetInitializer, where the starting timestamp is not set > during source/job construction. Rather it is a supplier model where the > starting timestamp value is set to the resolved absolute timestamp during > switch. > > Thanks, > Steven > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise wrote: > > > Hi Nicholas, > > > > Thanks for taking a look at the PR! > > > > 1. Regarding switching mechanism: > > > > There has been previous discussion in this thread regarding the pros > > and cons of how the switching can be exposed to the user. > > > > With fixed start positions, no special switching interface to transfer > > information between enumerators is required. Sources are configured as > > they would be when used standalone and just plugged into HybridSource. > > I expect that to be a common use case. You can find an example for > > this in the ITCase: > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > > For dynamic start position, the checkpoint state is used to transfer > > information from old to new enumerator. An example for that can be > > found here: > > > > > >
Job gets stuck with exactly once checkpointing and Kafka when initializing.
Dear Flink Community, When using exactly once checkpointing along kafka and connecting Kafka using SASL_SSL the job gets stuck with status of Kafka producer in Initialising state for around 10 mins and post that it functions normally across multiple checkpoints. Following are various config and observations. Flink/Job config: VERSION: Flink 1.13.1 for both env and jars. 1. Single node with checkpointing in exactly once mode and checkpointing interval as 2 minutes. ( tried with 10 minutes still same issue) 2. The job has source as flink kafka consumer for topic 1 and sink as flink kafka producer for topic 2 and parallelism as 4 for source & sink. 3. The kafka producer uses FlinkKafkaProducer implementation in EXACTLY_ONCE mode and connects to broker using SASL_SSL where SSL certificate is self signed. Kafka config: VERSION: 2.8 for broker. For Flink Job tried 2.4.1 (shipped with flink kafka connector) and 2.8.0 1. Single broker with both topics having 4 partitions. 2. Everything works correctly using Kafka Consumer commands when using SASL_SSL mode. Observation for debug logs: Background: In dev env with PLAIN_TEXT connection everything was working perfectly. When I changed PLAIN_TEXT to SASL_SSL I was get timeout exceptions. Which was resolved after I increased max.block.ms from 6 to 9 but then started facing delayed start issue. 1. About error/exception: I did not find any exception in debug logs but the notified following pattern which keeps repeating until successful initialisation SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH PRODUCER_ID —> Transition from state INITIALIZING to READY —> CLOSE PRODUCER The producer keep repeating above step until following pattern occurs SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH PRODUCER_ID —> Transition from state INITIALIZING to READY —> READY to IN_TRANSACTION. Also while kafka producer/sink is stuck in Initialising state if I cancel job the kafka sink operator gets stuck and I get following error Task did not exit gracefully within 180 + seconds. And task manager crashes. 2. Turning off SSL and using SASL_PLAINTEXT - When connecting to kafka broker using SASL_PLAINTEXT mode the kafka producer gets initialised and starts processing data in 50-60 secs. 3. Disable checkpointing - When checkpointing is disabled and use SASL_SSL/SASL_PLAINTEXT mode fro kafka. The kafka producer get initialised immediately but Kafka Consumer or sink takes 10-15 secs to initialise and then app operators start processing data. 4. If wrote a producer directly using kafka producer from kafka library. And added transaction to it. My obeservation was kafkaProducer.initTransactions() method takes around 30 sec to execute post which it starting publishing data to kafka topic. 5. Same pattern of delayed start seems to appear incase of crash and automatic restart of job. 6. No exception in Flink Dashboard UI for job. PFA, a copy of part of log.
[jira] [Created] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window
Yue Ma created FLINK-22888: -- Summary: Matches results may be wrong when using notNext as the last part of the pattern with Window Key: FLINK-22888 URL: https://issues.apache.org/jira/browse/FLINK-22888 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: Yue Ma the pattern is like Pattern.begin("start").where(records == "a") .notNext("notNext").where(records == "b") .withIn(5milliseconds). If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should be output as the correct result of the match next time in advanceTime. But in the actual operation of CEP. This “a” will be treated as matching timeout data {code:java} // code placeholder @Test public void testNoNextWithWindow() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // (Event, timestamp) DataStream input = env.fromElements( Tuple2.of(new Event(1, "start", 1.0), 5L), // last element for high final watermark Tuple2.of(new Event(5, "final", 5.0), 100L) ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks>() { @Override public long extractTimestamp(Tuple2 element, long previousTimestamp) { return element.f1; } @Override public Watermark checkAndGetNextWatermark(Tuple2 lastElement, long extractedTimestamp) { return new Watermark(lastElement.f1 - 5); } }).map(new MapFunction, Event>() { @Override public Event map(Tuple2 value) throws Exception { return value.f0; } }); Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }).notNext("middle").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } }).within(Time.milliseconds(5L)); DataStream result = CEP.pattern(input, pattern).select( new PatternSelectFunction() { @Override public String select(Map> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()); return builder.toString(); } } ); List resultList = new ArrayList<>(); DataStreamUtils.collect(result).forEachRemaining(resultList::add); resultList.sort(String::compareTo); assertEquals(Arrays.asList("1"), resultList); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)