Re: [Discuss] Planning Flink 1.14

2021-06-06 Thread Benchao Li
Hi all,

Thanks Xintong for bringing this up.

I would like to share some experience of the usage of Flink in our company
(ByteDance).

1. We started building our SQL platform in mid 2019, using v1.9 blink
planner, and it's amazing.
Also we added many internal features which is still missing in this
version, including DDL/Computed Column/
a lot of internal formats and connectors, and some other planner changes.

2. At early 2020, we plan to upgrade to v1.10. Before we finished
cherry-picking internal commits to v1.10, we found
that v1.11 is going to be released soon. Hence we decided to upgrade to
v1.11.
Till late 2020, we almost finished internal feature check-picking work. (It
takes us so long because we still adding new features
to our online version v1.9 at the same time)

3. Now
Although we tried a lot of work to reduce the overhead for our users to
upgrading from v1.9 to v1.11, this process is still slow, because:
a) All the connectors/formats properties changed (although we have a tool
for them to upgrade in one click, they still have a lot of learning cost)
b) The checkpoint cannot be upgraded

4. Future
We have 5000+ online SQL jobs and hundreds of commits, we do not plan to do
an upgrade in short term.
However v1.11 still lacks a lot of features, for example:
a) new UDF type inference does not support aggregate function
b) FLIP-27 new source interface cannot be used in SQL
We may need to to a lot of cherry-picking to our v1.11

So, from our point, longer release circle and more fully finished features
may benefit us a lot.


JING ZHANG  于2021年6月4日周五 下午6:02写道:

> Hi all,
>
> @Xintong Song
> Thanks for reminding me, I would contact Jark to update the wiki page.
>
> Besides, I'd like to provide more inputs by sharing our experience about
> upgrading Internal version of Flink.
>
> Flink has been widely used in the production environment since 2018 in our
> company. Our internal version is far behind the latest stable version of
> the community by about 1 year. We upgraded the internal Flink version to
> 1.10 version in March last year, and we plan to upgrade directly to 1.13
> next month (missed 1.11 and 1.12 versions). We wish to use the latest
> version as soon as possible. However, in fact we follow up with the
> community's latest stable release version almost once a year because
> upgrading to a new version is a time-consuming process.
>
> I list detailed works as follows.
>
> a. Before release new internal version
> 1) Required: Cherrypick internal features to the new Flink branch. A few
> features need to be redeveloped based on the new branch code base.
> BTW, The cost would be more and more heavy since we maintain more and
> more internal features in our internal version.
> 2) Optional: Some internal connectors need to adapt to the new API
> 3) Required: Surrounding products need to updated based on the new API, for
> example, Internal Flink SQL WEB development platform
> 4) Required: Regression tests
>
> b. After release, encourage users to upgrade existing jobs (Thousands of
> jobs) to the new version, User need some time to do :
> 1) Repackage jar for dataStream job
> 2) For critical jobs, users need to run jobs at the two versions at the
> same time for a while. Migrated to a new job only after comparing the
> data carefully.
> 3) Pure ETL SQL jobs are easy to bump up. But other Flink SQL jobs with
> stateful operators need extra efforts because Flink SQL Job does not
> support state compatibility yet.
>
> Best regards,
> JING ZHANG
>
> Prasanna kumar  于2021年6月4日周五 下午2:27写道:
>
> > Hi all,
> >
> > We are using Flink for our eventing system. Overall we are very happy
> with
> > the tech, documentation and community support and quick replies in mails.
> >
> > My last 1 year experience with versions.
> >
> > We were working on 1.10 initially during our research phase then we
> > stabilised with 1.11 as we moved on but by the time we are about to get
> > into production 1.12 was released. As with all software and products,
> > there were bugs reported. So we waited till 1.12.2 was released and then
> > upgraded. Within a month of us doing it 1.13 got released.
> >
> > But by past experience , we waited till at least a couple of minor
> > versions(fixing bugs) get released before we move onto a newer version.
> > The development happens at a rapid/good phase in flink (which is good in
> > terms of features) but adoption and moving the production code to newer
> > version 3/4 times a year is an onerous effort. For example , the memory
> > model was changed in one of the releases (there is a good documentation)
> .
> > But as a production user to adopt the newer version, at least a month of
> > testing is required with a huge scale environment. We also do not want to
> > be behind more than 2 versions at any point of time.
> >
> > I Personally feel 2 major releases a year or at max a release once 5
> months
> > is good.
> >
> > Thanks
> > Prasanna.
> >
> > On Fri, Jun 4, 2021 at 9:38 AM 

[jira] [Created] (FLINK-22895) Table common document misspelled

2021-06-06 Thread sujun (Jira)
sujun created FLINK-22895:
-

 Summary: Table common document misspelled
 Key: FLINK-22895
 URL: https://issues.apache.org/jira/browse/FLINK-22895
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.14.0
Reporter: sujun
 Attachments: image-2021-06-07-11-04-30-998.png

The variable is misspelled, the setting should be changed to settings.

The document url is: 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/common/#create-a-tableenvironment]

 

!image-2021-06-07-11-04-30-998.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Add control mode for flink

2021-06-06 Thread 刘建刚
Thank you for the reply. I have checked the post you mentioned. The dynamic
config may be useful sometimes. But it is hard to keep data consistent in
flink, for example, what if the dynamic config will take effect when
failover. Since dynamic config is a desire for users, maybe flink can
support it in some way.

For the control mode, dynamic config is just one of the control modes. In
the google doc, I have list some other cases. For example, control events
are generated in operators or external services. Besides user's dynamic
config, flink system can support some common dynamic configuration, like
qps limit, checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that,
other control features can be added easily later, like changing log level
when job is running. In the end, flink will not just process data, but also
interact with users to receive control events like a service.

Steven Wu  于2021年6月4日周五 下午11:11写道:

> I am not sure if we should solve this problem in Flink. This is more like
> a dynamic config problem that probably should be solved by some
> configuration framework. Here is one post from google search:
> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>
> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:
>
>> Hi everyone,
>>
>>   Flink jobs are always long-running. When the job is running, users
>> may want to control the job but not stop it. The control reasons can be
>> different as following:
>>
>>1.
>>
>>Change data processing’ logic, such as filter condition.
>>2.
>>
>>Send trigger events to make the progress forward.
>>3.
>>
>>Define some tools to degrade the job, such as limit input qps,
>>sampling data.
>>4.
>>
>>Change log level to debug current problem.
>>
>>   The common way to do this is to stop the job, do modifications and
>> start the job. It may take a long time to recover. In some situations,
>> stopping jobs is intolerable, for example, the job is related to money or
>> important activities.So we need some technologies to control the running
>> job without stopping the job.
>>
>>
>> We propose to add control mode for flink. A control mode based on the
>> restful interface is first introduced. It works by these steps:
>>
>>
>>1. The user can predefine some logic which supports config control,
>>such as filter condition.
>>2. Run the job.
>>3. If the user wants to change the job's running logic, just send a
>>restful request with the responding config.
>>
>> Other control modes will also be considered in the future. More
>> introduction can refer to the doc
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> . If the community likes the proposal, more discussion is needed and a more
>> detailed design will be given later. Any suggestions and ideas are welcome.
>>
>>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Jingsong Li
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao  wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>current hash-based blocking shuffle implementation writes too many files
>>concurrently which gives high pressure to the file system, for example,
>>maintenance of too many file metas, exhaustion of inodes or file
>>descriptors. All of these can be potential stability issues. Sort-Merge
>>based blocking shuffle don’t have the problem because for one result
>>partition, only one file is written at the same time.
>>2. *Performance*: Large amounts of small shuffle files and random IO
>>can influence shuffle performance a lot especially for hdd (for ssd,
>>sequential read is also important because of read ahead and cache). For
>>batch jobs processing massive data, small amount of data per subpartition
>>is common because of high parallelism. Besides, data skew is another cause
>>of small subpartition files. By merging data of all subpartitions together
>>in one file, more sequential read can be achieved.
>>3. *Resource*: For current hash-based implementation, each
>>subpartition needs at least one buffer. For large scale batch shuffles, 
>> the
>>memory consumption can be huge. For example, we need at least 320M network
>>memory per result partition if parallelism is set to 1 and because of
>>the huge network consumption, it is hard to config the network memory for
>>large scale batch job and  sometimes parallelism can not be increased just
>>because of insufficient network memory  which leads to bad user 
>> experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao  于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO
>can influence shuffle performance a lot especially for hdd (for ssd,
>sequential read is also important because of read ahead and cache). For
>batch jobs processing massive data, small amount of data per subpartition
>is common because of high parallelism. Besides, data skew is another cause
>of small subpartition files. By merging data of all subpartitions together
>in one file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each
>subpartition needs at least one buffer. For large scale batch shuffles, the
>memory consumption can be huge. For example, we need at least 320M network
>memory per result partition if parallelism is set to 1 and because of
>the huge network consumption, it is hard to config the network memory for
>large scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


[jira] [Created] (FLINK-22894) Window Top-N should allow n=1

2021-06-06 Thread David Anderson (Jira)
David Anderson created FLINK-22894:
--

 Summary: Window Top-N should allow n=1
 Key: FLINK-22894
 URL: https://issues.apache.org/jira/browse/FLINK-22894
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.13.1
Reporter: David Anderson


I tried to reimplement the Hourly Tips exercise from the DataStream training 
using Flink SQL. The objective of this exercise is to find the one taxi driver 
who earned the most in tips during each hour, and report that driver's driverId 
and the sum of their tips. 

This can be expressed as a window top-n query, where n=1, as in

{{FROM (}}
{{  SELECT *, ROW_NUMBER() OVER }}{{(PARTITION BY window_start, window_end 
ORDER BY sumOfTips DESC) as rownum}}
{{  FROM ( }}
{{    SELECT driverId, window_start, window_end, sum(tip) as sumOfTips}}
{{    FROM TABLE( }}
{{      TUMBLE(TABLE fares, DESCRIPTOR(startTime), INTERVAL '1' HOUR))}}
{{    GROUP BY driverId, window_start, window_end}}
{{  )}}
{{) WHERE rownum = 1;}}

 

This fails because the {{WindowRankOperatorBuilder}} insists on {{rankEnd > 1. 
}}So, in other words, while it is possible to report the top 2 drivers, or the 
driver in 2nd place, it's not possible to report only the top driver.

This appears to be an off-by-one error in the range checking.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22893) ResumeCheckpointManuallyITCase hangs on azure

2021-06-06 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22893:


 Summary: ResumeCheckpointManuallyITCase hangs on azure
 Key: FLINK-22893
 URL: https://issues.apache.org/jira/browse/FLINK-22893
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc=4382



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22892) Resuming Externalized Checkpoint (rocks, incremental, scale down) end-to-end test fails

2021-06-06 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22892:


 Summary: Resuming Externalized Checkpoint (rocks, incremental, 
scale down) end-to-end test fails
 Key: FLINK-22892
 URL: https://issues.apache.org/jira/browse/FLINK-22892
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=739e6eac-8312-5d31-d437-294c4d26fced=a68b8d89-50e9-5977-4500-f4fde4f57f9b=921

{code}
Jun 05 20:47:11 Running 'Resuming Externalized Checkpoint (rocks, incremental, 
scale down) end-to-end test'
Jun 05 20:47:11 
==
Jun 05 20:47:11 TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-11283890527
Jun 05 20:47:11 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.14-SNAPSHOT-bin/flink-1.14-SNAPSHOT
Jun 05 20:47:11 Starting cluster.
Jun 05 20:47:12 Starting standalonesession daemon on host fv-az83-351.
Jun 05 20:47:13 Starting taskexecutor daemon on host fv-az83-351.
Jun 05 20:47:13 Waiting for Dispatcher REST endpoint to come up...
Jun 05 20:47:14 Waiting for Dispatcher REST endpoint to come up...
Jun 05 20:47:16 Waiting for Dispatcher REST endpoint to come up...
Jun 05 20:47:17 Waiting for Dispatcher REST endpoint to come up...
Jun 05 20:47:18 Dispatcher REST endpoint is up.
Jun 05 20:47:18 Running externalized checkpoints test,   with ORIGINAL_DOP=4 
NEW_DOP=2   and STATE_BACKEND_TYPE=rocks STATE_BACKEND_FILE_ASYNC=true   
STATE_BACKEND_ROCKSDB_INCREMENTAL=true SIMULATE_FAILURE=false ...
Jun 05 20:47:25 Job (b84a167a07b862a9dbbcdc6a5969f75c) is running.
Jun 05 20:47:25 Waiting for job (b84a167a07b862a9dbbcdc6a5969f75c) to have at 
least 1 completed checkpoints ...
Jun 05 20:57:30 A timeout occurred waiting for job 
(b84a167a07b862a9dbbcdc6a5969f75c) to have at least 1 completed checkpoints .
Jun 05 20:57:30 Stopping job timeout watchdog (with pid=23016)
Jun 05 20:57:30 [FAIL] Test script contains errors.
Jun 05 20:57:30 Checking of logs skipped.
Jun 05 20:57:30 
Jun 05 20:57:30 [FAIL] 'Resuming Externalized Checkpoint (rocks, incremental, 
scale down) end-to-end test' failed after 10 minutes and 19 seconds! Test 
exited with exit code 1
Jun 05 20:57:30 

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22891) FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure

2021-06-06 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22891:


 Summary: 
FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase fails on azure
 Key: FLINK-22891
 URL: https://issues.apache.org/jira/browse/FLINK-22891
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18700=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=05b74a19-4ee4-5036-c46f-ada307df6cf0=8660

{code}
Jun 05 21:16:00 [ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 6.24 s <<< FAILURE! - in 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
Jun 05 21:16:00 [ERROR] 
testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase)
  Time elapsed: 5.015 s  <<< ERROR!
Jun 05 21:16:00 java.util.concurrent.TimeoutException
Jun 05 21:16:00 at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
Jun 05 21:16:00 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(FineGrainedSlotManagerTestBase.java:121)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.lambda$new$4(AbstractFineGrainedSlotManagerITCase.java:374)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:212)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase$4.(AbstractFineGrainedSlotManagerITCase.java:310)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobAfterFree(AbstractFineGrainedSlotManagerITCase.java:308)
Jun 05 21:16:00 at 
org.apache.flink.runtime.resourcemanager.slotmanager.AbstractFineGrainedSlotManagerITCase.testResourceCanBeAllocatedForDifferentJobWithDeclarationBeforeSlotFree(AbstractFineGrainedSlotManagerITCase.java:262)
Jun 05 21:16:00 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jun 05 21:16:00 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jun 05 21:16:00 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun 05 21:16:00 at java.lang.reflect.Method.invoke(Method.java:498)
Jun 05 21:16:00 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Jun 05 21:16:00 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Jun 05 21:16:00 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Jun 05 21:16:00 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Jun 05 21:16:00 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
Jun 05 21:16:00 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Jun 05 21:16:00 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Jun 05 21:16:00 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Jun 05 21:16:00 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Jun 05 21:16:00 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Jun 05 21:16:00 at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
Jun 05 21:16:00 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
Jun 05 21:16:00 at 

[jira] [Created] (FLINK-22890) Few tests fail in HiveTableSinkITCase

2021-06-06 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22890:


 Summary: Few tests fail in HiveTableSinkITCase
 Key: FLINK-22890
 URL: https://issues.apache.org/jira/browse/FLINK-22890
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Table SQL / Ecosystem
Affects Versions: 1.13.1
Reporter: Dawid Wysakowicz


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18692=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=420bd9ec-164e-562e-8947-0dacde3cec91=23189

{code}
Jun 05 01:22:13 [ERROR] Errors: 
Jun 05 01:22:13 [ERROR]   HiveTableSinkITCase.testBatchAppend:138 » Validation 
Could not execute CREATE ...
Jun 05 01:22:13 [ERROR]   
HiveTableSinkITCase.testDefaultSerPartStreamingWrite:156->testStreamingWrite:494
 » Validation
Jun 05 01:22:13 [ERROR]   
HiveTableSinkITCase.testHiveTableSinkWithParallelismInStreaming:100->testHiveTableSinkWithParallelismBase:108
 » Validation
Jun 05 01:22:13 [ERROR]   
HiveTableSinkITCase.testPartStreamingMrWrite:179->testStreamingWrite:423 » 
Validation
Jun 05 01:22:13 [ERROR]   
HiveTableSinkITCase.testStreamingSinkWithTimestampLtzWatermark:360->fetchRows:384
 » TestTimedOut
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22889) JdbcExactlyOnceSinkE2eTest hangs on azure

2021-06-06 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-22889:


 Summary: JdbcExactlyOnceSinkE2eTest hangs on azure
 Key: FLINK-22889
 URL: https://issues.apache.org/jira/browse/FLINK-22889
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.14.0
Reporter: Dawid Wysakowicz
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18690=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51=16658



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-06 Thread Steven Wu
> Converter function relies on the specific enumerator capabilities to set
the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)

I guess the premise is that a converter is for a specific tuple of
(upstream source, downstream source) . We don't have to define generic
EndtStateT and SwitchableEnumerator interfaces. That should work.

The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
probably promoting uniformity across sources that support hybrid/switchable
source.

On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise  wrote:

> Hi Steven,
>
> Thank you for the thorough review of the PR and for bringing this back
> to the mailing list.
>
> All,
>
> I updated the FLIP-150 page to highlight aspects in which the PR
> deviates from the original proposal [1]. The goal would be to update
> the FLIP soon and bring it to a vote, as previously suggested offline
> by Nicholas.
>
> A few minor issues in the PR are outstanding and I'm working on test
> coverage for the recovery behavior, which should be completed soon.
>
> The dynamic position transfer needs to be concluded before we can move
> forward however.
>
> There have been various ideas, including the special
> "SwitchableEnumerator" interface, using enumerator checkpoint state or
> an enumerator interface extension to extract the end state.
>
> One goal in the FLIP is to "Reuse the existing Source connectors built
> with FLIP-27 without any change." and I think it is important to honor
> that goal given that fixed start positions do not require interface
> changes.
>
> Based on the feedback the following might be a good solution for
> runtime position transfer:
>
> * User supplies the optional converter function (not applicable for
> fixed positions).
> * Instead of relying on the enumerator checkpoint state [2], the
> converter function will be supplied with the current and next
> enumerator (source.createEnumerator).
> * Converter function relies on the specific enumerator capabilities to
> set the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> * HybridSourceSplitEnumerator starts new underlying enumerator
>
> With this approach, there is no need to augment FLIP-27 interfaces and
> custom source capabilities are easier to integrate. Removing the
> mandate to rely on enumerator checkpoint state also avoids potential
> upgrade/compatibility issues.
>
> Thoughts?
>
> Thanks,
> Thomas
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> [2]
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>
>
> On Tue, Jun 1, 2021 at 3:10 PM Steven Wu  wrote:
> >
> > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > missed anything.
> >
> > Right now, the PR differs from the FLIP-150 doc regarding the converter.
> > * Current PR uses the enumerator checkpoint state type as the input for
> the
> > converter
> > * FLIP-150 defines a new EndStateT interface.
> > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > transition EndStateT doesn't have to be included in the upstream source
> > checkpoint state.
> >
> > Let's look at two use cases:
> > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> pm,
> > then Kafka source starts with initial position of 5 pm. In this case,
> there
> > is no need for converter or EndStateT since the starting time for Kafka
> > source is known and fixed.
> > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > bootstrap of historic data takes a long time (like days or weeks) and we
> > don't know the exact time of cutover when a job is launched. Instead, we
> > are instructing the file source to stop when it gets close to live data.
> In
> > this case, hybrid source construction will specify a relative time (now
> - 1
> > hour), the EndStateT (of file source) will be resolved to an absolute
> time
> > for cutover. We probably don't need to include EndStateT (end timestamp)
> as
> > the file source checkpoint state. Hence, the separate EndStateT is
> probably
> > more desirable.
> >
> > We also discussed the converter for the Kafka source. Kafka source
> supports
> > different OffsetsInitializer impls (including
> TimestampOffsetsInitializer).
> > To support the dynamic cutover time (use case #2 above), we can plug in a
> > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> set
> > during source/job construction. Rather it is a supplier model where the
> > starting timestamp value is set to the resolved absolute timestamp during
> > switch.
> >
> > Thanks,
> > Steven
> >
> >
> >
> > On Thu, May 20, 2021 at 8:59 PM Thomas Weise  wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for taking a look at 

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-06 Thread Thomas Weise
Hi Steven,

Thank you for the thorough review of the PR and for bringing this back
to the mailing list.

All,

I updated the FLIP-150 page to highlight aspects in which the PR
deviates from the original proposal [1]. The goal would be to update
the FLIP soon and bring it to a vote, as previously suggested offline
by Nicholas.

A few minor issues in the PR are outstanding and I'm working on test
coverage for the recovery behavior, which should be completed soon.

The dynamic position transfer needs to be concluded before we can move
forward however.

There have been various ideas, including the special
"SwitchableEnumerator" interface, using enumerator checkpoint state or
an enumerator interface extension to extract the end state.

One goal in the FLIP is to "Reuse the existing Source connectors built
with FLIP-27 without any change." and I think it is important to honor
that goal given that fixed start positions do not require interface
changes.

Based on the feedback the following might be a good solution for
runtime position transfer:

* User supplies the optional converter function (not applicable for
fixed positions).
* Instead of relying on the enumerator checkpoint state [2], the
converter function will be supplied with the current and next
enumerator (source.createEnumerator).
* Converter function relies on the specific enumerator capabilities to
set the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
* HybridSourceSplitEnumerator starts new underlying enumerator

With this approach, there is no need to augment FLIP-27 interfaces and
custom source capabilities are easier to integrate. Removing the
mandate to rely on enumerator checkpoint state also avoids potential
upgrade/compatibility issues.

Thoughts?

Thanks,
Thomas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
[2] 
https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281


On Tue, Jun 1, 2021 at 3:10 PM Steven Wu  wrote:
>
> discussed the PR with Thosmas offline. Thomas, please correct me if I
> missed anything.
>
> Right now, the PR differs from the FLIP-150 doc regarding the converter.
> * Current PR uses the enumerator checkpoint state type as the input for the
> converter
> * FLIP-150 defines a new EndStateT interface.
> It seems that the FLIP-150 approach of EndStateT is more flexible, as
> transition EndStateT doesn't have to be included in the upstream source
> checkpoint state.
>
> Let's look at two use cases:
> 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
> then Kafka source starts with initial position of 5 pm. In this case, there
> is no need for converter or EndStateT since the starting time for Kafka
> source is known and fixed.
> 2) dynamic cutover time at 1 hour before now. This is useful when the
> bootstrap of historic data takes a long time (like days or weeks) and we
> don't know the exact time of cutover when a job is launched. Instead, we
> are instructing the file source to stop when it gets close to live data. In
> this case, hybrid source construction will specify a relative time (now - 1
> hour), the EndStateT (of file source) will be resolved to an absolute time
> for cutover. We probably don't need to include EndStateT (end timestamp) as
> the file source checkpoint state. Hence, the separate EndStateT is probably
> more desirable.
>
> We also discussed the converter for the Kafka source. Kafka source supports
> different OffsetsInitializer impls (including TimestampOffsetsInitializer).
> To support the dynamic cutover time (use case #2 above), we can plug in a
> SupplierTimestampOffsetInitializer, where the starting timestamp is not set
> during source/job construction. Rather it is a supplier model where the
> starting timestamp value is set to the resolved absolute timestamp during
> switch.
>
> Thanks,
> Steven
>
>
>
> On Thu, May 20, 2021 at 8:59 PM Thomas Weise  wrote:
>
> > Hi Nicholas,
> >
> > Thanks for taking a look at the PR!
> >
> > 1. Regarding switching mechanism:
> >
> > There has been previous discussion in this thread regarding the pros
> > and cons of how the switching can be exposed to the user.
> >
> > With fixed start positions, no special switching interface to transfer
> > information between enumerators is required. Sources are configured as
> > they would be when used standalone and just plugged into HybridSource.
> > I expect that to be a common use case. You can find an example for
> > this in the ITCase:
> >
> >
> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> >
> > For dynamic start position, the checkpoint state is used to transfer
> > information from old to new enumerator. An example for that can be
> > found here:
> >
> >
> > 

Job gets stuck with exactly once checkpointing and Kafka when initializing.

2021-06-06 Thread Anil Singh

Dear Flink Community,
When using exactly once checkpointing 
along kafka and connecting Kafka using SASL_SSL the job gets stuck with status 
of Kafka producer in Initialising state for around 10 mins and post that it 
functions normally across multiple checkpoints. Following are various config 
and observations.

Flink/Job config:  
VERSION: Flink 1.13.1 for both env and jars.

1. Single node with checkpointing in exactly once mode and checkpointing 
interval as 2 minutes. ( tried with 10 minutes still same issue)
2. The job has source as flink kafka consumer for topic 1 and sink as flink 
kafka producer for topic 2 and parallelism as 4 for source & sink.
3. The kafka producer uses FlinkKafkaProducer implementation in EXACTLY_ONCE 
mode and connects to broker using SASL_SSL where SSL certificate is self signed.


Kafka config:
VERSION: 2.8 for broker. For Flink Job tried 2.4.1 (shipped with flink kafka 
connector) and 2.8.0

1. Single broker with both topics having 4 partitions.
2. Everything works correctly using Kafka Consumer commands when using SASL_SSL 
mode.

Observation for debug logs:
Background: In dev env with PLAIN_TEXT connection everything was working 
perfectly. When I changed PLAIN_TEXT to SASL_SSL I was get timeout exceptions. 
Which was resolved after I increased max.block.ms from 6 to 9 but then 
started facing delayed start issue.

1. About error/exception:
I did not find any exception in debug logs but the notified following 
pattern which keeps repeating until successful initialisation
SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH 
PRODUCER_ID —> Transition from state INITIALIZING to READY —> CLOSE PRODUCER

The producer keep repeating above step until following pattern occurs 
SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH 
PRODUCER_ID —> Transition from state INITIALIZING to READY —> READY to 
IN_TRANSACTION.

Also while kafka producer/sink is stuck in Initialising state if I cancel job 
the kafka sink operator gets stuck and I get following error  Task did not exit 
gracefully within 180 + seconds. And task manager crashes.

2. Turning off SSL and using SASL_PLAINTEXT - When connecting to kafka broker 
using SASL_PLAINTEXT mode the kafka producer gets initialised and starts 
processing data in 50-60 secs.

3. Disable checkpointing - When checkpointing is disabled and use 
SASL_SSL/SASL_PLAINTEXT mode fro kafka. The kafka producer get initialised 
immediately but Kafka Consumer or sink takes 10-15 secs to initialise and then 
app operators start processing data.

4. If wrote a producer directly using kafka producer from kafka library. And 
added transaction to it. My obeservation was kafkaProducer.initTransactions() 
method takes around 30 sec to execute post which it starting publishing data to 
kafka topic.

5. Same pattern of delayed start seems to appear incase of crash and automatic 
restart of job.

6. No exception in Flink Dashboard UI for job.

PFA, a copy of part of log.



[jira] [Created] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window

2021-06-06 Thread Yue Ma (Jira)
Yue Ma created FLINK-22888:
--

 Summary: Matches results may be wrong when using notNext as the 
last part of the pattern with Window
 Key: FLINK-22888
 URL: https://issues.apache.org/jira/browse/FLINK-22888
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.9.0
Reporter: Yue Ma


the pattern is like 
Pattern.begin("start").where(records == "a")

            .notNext("notNext").where(records == "b")

            .withIn(5milliseconds).

 

If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should 
be output as the correct result of the match next time in advanceTime.

But in the actual operation of CEP. This “a” will be treated as matching 
timeout data
{code:java}
// code placeholder
@Test
public void testNoNextWithWindow() throws Exception {
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

   // (Event, timestamp)
   DataStream input = env.fromElements(
  Tuple2.of(new Event(1, "start", 1.0), 5L),

  // last element for high final watermark
  Tuple2.of(new Event(5, "final", 5.0), 100L)
   ).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks>() {

  @Override
  public long extractTimestamp(Tuple2 element, long 
previousTimestamp) {
 return element.f1;
  }

  @Override
  public Watermark checkAndGetNextWatermark(Tuple2 
lastElement, long extractedTimestamp) {
 return new Watermark(lastElement.f1 - 5);
  }

   }).map(new MapFunction, Event>() {

  @Override
  public Event map(Tuple2 value) throws Exception {
 return value.f0;
  }
   });

   Pattern pattern = Pattern.begin("start").where(new 
SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
 return value.getName().equals("start");
  }
   }).notNext("middle").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
 return value.getName().equals("middle");
  }
   }).within(Time.milliseconds(5L));

   DataStream result = CEP.pattern(input, pattern).select(
  new PatternSelectFunction() {
 @Override
 public String select(Map> pattern) {
StringBuilder builder = new StringBuilder();
builder.append(pattern.get("start").get(0).getId());
return builder.toString();
 }
  }
   );

   List resultList = new ArrayList<>();

   DataStreamUtils.collect(result).forEachRemaining(resultList::add);

   resultList.sort(String::compareTo);

   assertEquals(Arrays.asList("1"), resultList);
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)