[jira] [Created] (FLINK-34698) CLONE - Deploy Python artifacts to PyPI

2024-03-15 Thread lincoln lee (Jira)
lincoln lee created FLINK-34698:
---

 Summary: CLONE - Deploy Python artifacts to PyPI
 Key: FLINK-34698
 URL: https://issues.apache.org/jira/browse/FLINK-34698
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin
Assignee: Jing Ge


Release manager should create a PyPI account and ask the PMC add this account 
to pyflink collaborator list with Maintainer role (The PyPI admin account info 
can be found here. NOTE, only visible to PMC members) to deploy the Python 
artifacts to PyPI. The artifacts could be uploaded using 
twine([https://pypi.org/project/twine/]). To install twine, just run:
{code:java}
pip install --upgrade twine==1.12.0
{code}
Download the python artifacts from dist.apache.org and upload it to pypi.org:
{code:java}
svn checkout 
https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
cd flink-${RELEASE_VERSION}-rc${RC_NUM}
 
cd python
 
#uploads wheels
for f in *.whl; do twine upload --repository-url 
https://upload.pypi.org/legacy/ $f $f.asc; done
 
#upload source packages
twine upload --repository-url https://upload.pypi.org/legacy/ 
apache-flink-libraries-${RELEASE_VERSION}.tar.gz 
apache-flink-libraries-${RELEASE_VERSION}.tar.gz.asc
 
twine upload --repository-url https://upload.pypi.org/legacy/ 
apache-flink-${RELEASE_VERSION}.tar.gz 
apache-flink-${RELEASE_VERSION}.tar.gz.asc
{code}
If upload failed or incorrect for some reason (e.g. network transmission 
problem), you need to delete the uploaded release package of the same version 
(if exists) and rename the artifact to 
\{{{}apache-flink-${RELEASE_VERSION}.post0.tar.gz{}}}, then re-upload.

(!) Note: re-uploading to pypi.org must be avoided as much as possible because 
it will cause some irreparable problems. If that happens, users cannot install 
the apache-flink package by explicitly specifying the package version, i.e. the 
following command "pip install apache-flink==${RELEASE_VERSION}" will fail. 
Instead they have to run "pip install apache-flink" or "pip install 
apache-flink==${RELEASE_VERSION}.post0" to install the apache-flink package.

 

h3. Expectations
 * Python artifacts released and indexed in the 
[PyPI|https://pypi.org/project/apache-flink/] Repository



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34701) CLONE - Publish the Dockerfiles for the new release

2024-03-15 Thread lincoln lee (Jira)
lincoln lee created FLINK-34701:
---

 Summary: CLONE - Publish the Dockerfiles for the new release
 Key: FLINK-34701
 URL: https://issues.apache.org/jira/browse/FLINK-34701
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin
Assignee: Jing Ge


Note: the official Dockerfiles fetch the binary distribution of the target 
Flink version from an Apache mirror. After publishing the binary release 
artifacts, mirrors can take some hours to start serving the new artifacts, so 
you may want to wait to do this step until you are ready to continue with the 
"Promote the release" steps in the follow-up Jira.

Follow the [release instructions in the flink-docker 
repo|https://github.com/apache/flink-docker#release-workflow] to build the new 
Dockerfiles and send an updated manifest to Docker Hub so the new images are 
built and published.

 

h3. Expectations

 * Dockerfiles in [flink-docker|https://github.com/apache/flink-docker] updated 
for the new Flink release and pull request opened on the Docker official-images 
with an updated manifest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34699) CLONE - Deploy artifacts to Maven Central Repository

2024-03-15 Thread lincoln lee (Jira)
lincoln lee created FLINK-34699:
---

 Summary: CLONE - Deploy artifacts to Maven Central Repository
 Key: FLINK-34699
 URL: https://issues.apache.org/jira/browse/FLINK-34699
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin
Assignee: Jing Ge


Use the [Apache Nexus repository|https://repository.apache.org/] to release the 
staged binary artifacts to the Maven Central repository. In the Staging 
Repositories section, find the relevant release candidate orgapacheflink-XXX 
entry and click Release. Drop all other release candidates that are not being 
released.
h3. Deploy source and binary releases to dist.apache.org

Copy the source and binary releases from the dev repository to the release 
repository at [dist.apache.org|http://dist.apache.org/] using Subversion.
{code:java}
$ svn move -m "Release Flink ${RELEASE_VERSION}" 
https://dist.apache.org/repos/dist/dev/flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
 https://dist.apache.org/repos/dist/release/flink/flink-${RELEASE_VERSION}
{code}
(Note: Only PMC members have access to the release repository. If you do not 
have access, ask on the mailing list for assistance.)
h3. Remove old release candidates from [dist.apache.org|http://dist.apache.org/]

Remove the old release candidates from 
[https://dist.apache.org/repos/dist/dev/flink] using Subversion.
{code:java}
$ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates
$ cd flink
$ svn remove flink-${RELEASE_VERSION}-rc*
$ svn commit -m "Remove old release candidates for Apache Flink 
${RELEASE_VERSION}
{code}
 

h3. Expectations
 * Maven artifacts released and indexed in the [Maven Central 
Repository|https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.flink%22]
 (usually takes about a day to show up)
 * Source & binary distributions available in the release repository of 
[https://dist.apache.org/repos/dist/release/flink/]
 * Dev repository [https://dist.apache.org/repos/dist/dev/flink/] is empty
 * Website contains links to new release binaries and sources in download page
 * (for minor version updates) the front page references the correct new major 
release version and directs to the correct link



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34700) CLONE - Create Git tag and mark version as released in Jira

2024-03-15 Thread lincoln lee (Jira)
lincoln lee created FLINK-34700:
---

 Summary: CLONE - Create Git tag and mark version as released in 
Jira
 Key: FLINK-34700
 URL: https://issues.apache.org/jira/browse/FLINK-34700
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin
Assignee: Jing Ge


Create and push a new Git tag for the released version by copying the tag for 
the final release candidate, as follows:
{code:java}
$ git tag -s "release-${RELEASE_VERSION}" refs/tags/${TAG}^{} -m "Release Flink 
${RELEASE_VERSION}"
$ git push  refs/tags/release-${RELEASE_VERSION}
{code}
In JIRA, inside [version 
management|https://issues.apache.org/jira/plugins/servlet/project-config/FLINK/versions],
 hover over the current release and a settings menu will appear. Click Release, 
and select today’s date.

(Note: Only PMC members have access to the project administration. If you do 
not have access, ask on the mailing list for assistance.)

If PRs have been merged to the release branch after the the last release 
candidate was tagged, make sure that the corresponding Jira tickets have the 
correct Fix Version set.

 

h3. Expectations
 * Release tagged in the source code repository
 * Release version finalized in JIRA. (Note: Not all committers have 
administrator access to JIRA. If you end up getting permissions errors ask on 
the mailing list for assistance)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34697) Finalize release 1.19.0

2024-03-15 Thread lincoln lee (Jira)
lincoln lee created FLINK-34697:
---

 Summary: Finalize release 1.19.0
 Key: FLINK-34697
 URL: https://issues.apache.org/jira/browse/FLINK-34697
 Project: Flink
  Issue Type: New Feature
Reporter: Sergey Nuyanzin
Assignee: Jing Ge


Once the release candidate has been reviewed and approved by the community, the 
release should be finalized. This involves the final deployment of the release 
candidate to the release repositories, merging of the website changes, etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-15 Thread Jeyhun Karimov
Hi Benchao,

Thanks for your comments.

1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> if we cannot have a good greatest common divisor, like 127 + 128,
> could we just utilize one side's pre-partitioned attribute, and let
> another side just do the shuffle?


There are two cases we need to consider:

1. Static Partition (no partitions are added during the query execution) is
enabled AND both sources implement "SupportsPartitionPushdown"

In this case, we are sure that no new partitions will be added at runtime.
So, we have a chance equalize both sources' partitions and parallelism, IFF
both sources implement "SupportsPartitionPushdown" interface.
To achieve so, first we will fetch the existing partitions from source1
(say p_s1) and from source2 (say p_s2).
Then, we find the intersection of these two partition sets (say
p_intersect) and pushdown these partitions:

SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that
only specific partitions are read
SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read
with filtered partitions

Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3)
all of their downstream operators until (and including) their first common
ancestor (e.g., join) to be equal to the number of partitions (size of
p_intersect).

2. All other cases

In all other cases, the parallelism of both sources and their downstream
operators until their common ancestor would be equal to the MIN(p_s1,
p_s2).
That is, minimum of the partition size of source1 and partition size of
source2 will be selected as the parallelism.
Coming back to your example, if source1 parallelism is 127 and source2
parallelism is 128, then we will first check the partition size of source1
and source2.
Say partition size of source1 is 100 and partition size of source2 is 90.
Then, we would set the parallelism for source1, source2, and all of their
downstream operators until (and including) the join operator
to 90 (min(100, 90)).
We also plan to implement a cost based decision instead of the rule-based
one (the ones explained above - MIN rule).
One  possible result of the cost based estimation is to keep the partitions
on one side and perform the shuffling on another source.



2. In our current shuffle remove design (FlinkExpandConversionRule),
> we don't consider parallelism, we just remove unnecessary shuffles
> according to the distribution columns. After this FLIP, the
> parallelism may be bundled with source's partitions, then how will
> this optimization accommodate with FlinkExpandConversionRule, will you
> also change downstream operator's parallelisms if we want to also
> remove subsequent shuffles?



- From my understanding of FlinkExpandConversionRule, its removal logic is
agnostic to operator parallelism.
So, if FlinkExpandConversionRule decides to remove a shuffle operation,
then this FLIP will search another possible shuffle (the one closest to the
source) to remove.
If there is such an opportunity, this FLIP will remove the shuffle. So,
from my understanding FlinkExpandConversionRule and this optimization rule
can work together safely.
Please correct me if I misunderstood your question.



Regarding the new optimization rule, have you also considered to allow
> some non-strict mode like FlinkRelDistribution#requireStrict? For
> example, source is pre-partitioned by a, b columns, if we are
> consuming this source, and do a aggregate on a, b, c, can we utilize
> this optimization?


- Good point. Yes, there are some cases that non-strict mode will apply.
For example:

- pre-partitioned columns and aggregate columns are the same but have
different order (e.g., source pre-partitioned  w.r.t. a,b and aggregate has
a GROUP BY b,a)
- columns in the Exchange operator is a list-prefix of pre-partitoned
columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
Exchange's partition columns are a,b)

Please let me know if the above answers your questions or if you have any
other comments.

Regards,
Jeyhun

On Thu, Mar 14, 2024 at 12:48 PM Benchao Li  wrote:

> Thanks Jeyhun for bringing up this discussion, it is really exiting,
> +1 for the general idea.
>
> We also introduced a similar concept in Flink Batch internally to cope
> with bucketed tables in Hive, it is a very important improvement.
>
> > One thing to note is that for join queries, the parallelism of each join
> > source might be different. This might result in
> > inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
> > different mappings of partitions to source operators).
> > Therefore, it is the job of planner to detect this and adjust the
> > parallelism. With that having in mind,
> > the rest (how the split assigners perform) is consistent among many
> > sources.
>
> Could you elaborate a little more on this. I added my two cents here
> about this part:
> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> if we cannot have a 

Re: [jira] [Created] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Galen Warren
Yes, the issue is recoverability. However, there is one thing you can do.
Create a separate bucket in GCP for temporary files and initialize the
filesystem configuration (via FileSystem.initialize) with
*gs.writer.temporary.bucket.name
* set to the name of that bucket.
This will cause the GSRecoverableWriter to write intermediate/temporary
files to that bucket instead of the "real" bucket. Then, you can apply a TTL
lifecycle policy to the
temporary bucket to have files be deleted after whatever TTL you want.

If you try to recover to a check/savepoint farther back in time than that
TTL interval, the recovery will probably fail, but this will let you dial
in whatever recoverability period you want, i.e. longer (at higher storage
cost) or shorter (at lower storage cost).


On Fri, Mar 15, 2024 at 10:04 AM Simon-Shlomo Poil (Jira) 
wrote:

> Simon-Shlomo Poil created FLINK-34696:
> -
>
>  Summary: GSRecoverableWriterCommitter is generating excessive
> data blobs
>  Key: FLINK-34696
>  URL: https://issues.apache.org/jira/browse/FLINK-34696
>  Project: Flink
>   Issue Type: Bug
>   Components: Connectors / FileSystem
> Reporter: Simon-Shlomo Poil
>
>
> In the "composeBlobs" method of
> org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter
> many small blobs are combined to generate a final single blob using the
> google storage compose method. This compose action is performed iteratively
> each time composing  the resulting blob from the previous step with 31 new
> blobs until there are not remaining blobs. When the compose action is
> completed the temporary blobs are removed.
>
> This unfortunately leads to significant excessive use of data storage
> (which for google storage is a rather costly situation).
>
> *Simple example*
> We have 64 blobs each 1 GB; i.e. 64 GB
> 1st step: 32 blobs are composed into one blob; i.e. now 64 GB + 32 GB = 96
> GB
> 2nd step: The 32 GB blob from previous step is composed with 31 blobs; now
> we have 64 GB + 32 GB + 63 GB = 159 GB
> 3rd step: The last remaining blob is composed with the blob from the
> previous step; now we have: 64 GB + 32 GB + 63 GB + 64 GB = 223 GB
> I.e. in order to combine 64 GB of data we had an overhead of 159 GB.
>
> *Why is this big issue?*
> With large amount of data the overhead becomes significant. With TiB of
> data we experienced peaks of PiB leading to unexpected high costs, and
> (maybe unrelated) frequent storage exceptions thrown by the Google Storage
> library.
>
> *Suggested solution:*
> When the blobs are composed together they should be deleted to not
> duplicate data.
> Maybe this has implications for recoverability?
>
>
>
>
>
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)
>


[jira] [Created] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Simon-Shlomo Poil (Jira)
Simon-Shlomo Poil created FLINK-34696:
-

 Summary: GSRecoverableWriterCommitter is generating excessive data 
blobs
 Key: FLINK-34696
 URL: https://issues.apache.org/jira/browse/FLINK-34696
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Simon-Shlomo Poil


In the "composeBlobs" method of 
org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter
many small blobs are combined to generate a final single blob using the google 
storage compose method. This compose action is performed iteratively each time 
composing  the resulting blob from the previous step with 31 new blobs until 
there are not remaining blobs. When the compose action is completed the 
temporary blobs are removed.
 
This unfortunately leads to significant excessive use of data storage (which 
for google storage is a rather costly situation). 
 
*Simple example*
We have 64 blobs each 1 GB; i.e. 64 GB
1st step: 32 blobs are composed into one blob; i.e. now 64 GB + 32 GB = 96 GB
2nd step: The 32 GB blob from previous step is composed with 31 blobs; now we 
have 64 GB + 32 GB + 63 GB = 159 GB
3rd step: The last remaining blob is composed with the blob from the previous 
step; now we have: 64 GB + 32 GB + 63 GB + 64 GB = 223 GB
I.e. in order to combine 64 GB of data we had an overhead of 159 GB. 
 
*Why is this big issue?*
With large amount of data the overhead becomes significant. With TiB of data we 
experienced peaks of PiB leading to unexpected high costs, and (maybe 
unrelated) frequent storage exceptions thrown by the Google Storage library.
 
*Suggested solution:* 
When the blobs are composed together they should be deleted to not duplicate 
data.
Maybe this has implications for recoverability?
 
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34695) Move Flink's CI docker container into a public repo

2024-03-15 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34695:
-

 Summary: Move Flink's CI docker container into a public repo
 Key: FLINK-34695
 URL: https://issues.apache.org/jira/browse/FLINK-34695
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI
Affects Versions: 1.18.1, 1.19.0, 1.20.0
Reporter: Matthias Pohl


Currently, Flink's CI (GitHub Actions and Azure Pipelines) use a container to 
run the logic. The intention behind it is to have a way to mimick the CI setup 
locally as well.

The current Docker image is maintained from the 
[zentol/flink-ci-docker|https://github.com/zentol/flink-ci-docker] fork (owned 
by [~chesnay]) of 
[flink-ci/flink-ci-docker|https://github.com/flink-ci/flink-ci-docker] (owned 
by Ververica) which is not ideal. We should move this repo into a Apache-owned 
repository.

Additionally, the there's no workflow pushing the image automatically to a 
registry from where it can be used. Instead, the images were pushed to personal 
Docker Hub repos in the past (rmetzger, chesnay, mapohl). This is also not 
ideal. We should use a public repo using a GHA workflow to push the image to 
that repo.

Questions to answer here:
# Where shall the Docker image code be located?
# Which Docker registry should be used?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34694) Delete num of associations for streaming outer join

2024-03-15 Thread Roman Boyko (Jira)
Roman Boyko created FLINK-34694:
---

 Summary: Delete num of associations for streaming outer join
 Key: FLINK-34694
 URL: https://issues.apache.org/jira/browse/FLINK-34694
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.18.1, 1.19.0, 1.17.2, 1.16.3
Reporter: Roman Boyko
 Attachments: image-2024-03-15-19-51-29-282.png, 
image-2024-03-15-19-52-24-391.png

Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
OuterJoinRecordStateView is used to store additional field - the number of 
associations for every record. This leads to store additional Tuple2 and 
Integer data for every record in outer state.

This functionality is used only for sending:
 * -D[nullPaddingRecord] in case of first Accumulate record
 * +I[nullPaddingRecord] in case of last Revoke record

The overhead of storing additional data and updating the counter for 
associations can be avoided by checking the input state for these events.

 

The proposed solution can be found here - 
[https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]

 

According to the nexmark q20 test (changed to OUTER JOIN) it could increase the 
performance up to 20%:
 * Before:

!image-2024-03-15-19-52-24-391.png!
 * After:

!image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34693) Memory leak in KafkaWriter

2024-03-15 Thread Stefan Richter (Jira)
Stefan Richter created FLINK-34693:
--

 Summary: Memory leak in KafkaWriter
 Key: FLINK-34693
 URL: https://issues.apache.org/jira/browse/FLINK-34693
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.18.0
Reporter: Stefan Richter
 Attachments: image-2024-03-15-10-30-08-280.png, 
image-2024-03-15-10-30-50-902.png

KafkaWriter is keeping instances of {{TwoPhaseCommitProducer}} in Dequeue of 
closeables ({{{}producerCloseables{}}}). We are only adding instances to the 
queue (for each txn?), but never remove them so that the can be GC’ed.

>From heap dump:

!image-2024-03-15-10-30-50-902.png!
!04599375-f923-4d1a-8d68-9e17e54b363c#media-blob-url=true&id=9d1e022e-8762-45b3-877b-d298ec956078&collection=&contextId=870337&height=306&width=2284&alt=!!image-2024-03-15-10-30-08-280.png!!image-2024-03-15-10-28-48-591.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Chained rebalance() guarantees

2024-03-15 Thread Jan Lukavský

Hi,

while implementing new transform for Apache Beam, we hit some questions 
about what guarantees a DataSet#rebalance() method has in terms of 
chaining. According to [1] there was some suboptimality if execution of 
chained rebalance with intermediate mapping operation. As a result the 
implementation was changed to use optimizer hints [2]. Unfortunately, it 
seems that this hint (HINT_SHIP_STRATEGY_REPARTITION) can get ignored 
(i.e. the data is not redstributed among workers, but is processed 
sequentially by single worker). Beam has some primitives where it relies 
heavily on the guarantees that Reshuffle really does redistribute the data.


My question therefore is - what would be the best strategy to ensure the 
required semantics, that is that we enforce redistribution data, even in 
the case where there is a chain of Reshuffle -> Map -> Reshuffle? One 
option seems to use groupBy -> groupReduce(identity), which has the 
correct semantics, but is inefficient. Alternative way might be to use 
hint HINT_SHIP_STRATEGY_REPARTITION_HASH, but it remains unclear if this 
enforces repartition in all required cases. Would anyone have any 
suggestions?


Thanks,

 Jan

[1] https://lists.apache.org/thread/tsbyzlk8p7m59qbgtgfm4rl0cx1rnz1j

[2] https://github.com/apache/beam/pull/11530



[jira] [Created] (FLINK-34692) Update Flink website to point to the new Flink CDC “Get Started” page

2024-03-15 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-34692:
-

 Summary: Update Flink website to point to the new Flink CDC “Get 
Started” page 
 Key: FLINK-34692
 URL: https://issues.apache.org/jira/browse/FLINK-34692
 Project: Flink
  Issue Type: Sub-task
Reporter: Qingsheng Ren






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Unaligned checkpoint blocked by long Async operation

2024-03-15 Thread Gyula Fóra
Posting this to dev as well...

Thanks Zakelly,
Sounds like a solution could be to add a new different version of yield
that would actually yield to the checkpoint barrier too. That way operator
implementations could decide whether any state modification may or may not
have happened and can optionally allow checkpoint to be taken in the
"middle of record  processing".

Gyula

On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:

> Hi Gyula,
>
> Processing checkpoint halfway through `processElement` is problematic. The
> current element will not be included in the input in-flight data, and we
> cannot assume it has taken effect on the state by user code. So the best
> way is to treat `processElement` as an 'atomic' operation. I guess that's
> why the priority of the cp barrier is set low.
> However, the AsyncWaitOperator is a special case where we know the element
> blocked at `addToWorkQueue` has not started triggering the userFunction.
> Thus I'd suggest putting the element in the queue when the cp barrier
> comes, and taking a snapshot of the whole queue afterwards. The problem
> will be solved. But this approach also involves some code modifications on
> the mailbox executor.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>
>> Thank you for the detailed analysis Zakelly.
>>
>> I think we should consider whether yield should process checkpoint
>> barriers because this puts quite a serious limitation on the unaligned
>> checkpoints in these cases.
>> Do you know what is the reason behind the current priority setting? Is
>> there a problem with processing the barrier here?
>>
>> Gyula
>>
>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Well I tried your example in local mini-cluster, and it seems the source
>>> can take checkpoints but it will block in the following AsyncWaitOperator.
>>> IIUC, the unaligned checkpoint barrier should wait until the current
>>> `processElement` finishes its execution. In your example, the element queue
>>> of `AsyncWaitOperator` will end up full and `processElement` will be
>>> blocked at `addToWorkQueue`. Even though it will call
>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>> unprocessed since the priority of the barrier is -1, lower than the one
>>> `yield()` should handle. I verified this using single-step debugging.
>>>
>>> And if one element could finish its async io, the cp barrier can be
>>> processed afterwards. For example:
>>> ```
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>>> env.getConfig().setParallelism(1);
>>> AsyncDataStream.orderedWait(
>>> env.fromSequence(Long.MIN_VALUE,
>>> Long.MAX_VALUE).shuffle(),
>>> new AsyncFunction() {
>>> boolean first = true;
>>> @Override
>>> public void asyncInvoke(Long aLong,
>>> ResultFuture resultFuture) {
>>> if (first) {
>>>
>>> Executors.newSingleThreadExecutor().execute(() -> {
>>> try {
>>> Thread.sleep(2); // process
>>> after 20s, only for the first one.
>>> } catch (Throwable e) {}
>>> LOG.info("Complete one");
>>>
>>> resultFuture.complete(Collections.singleton(1L));
>>> });
>>> first = false;
>>> }
>>> }
>>> },
>>> 24,
>>> TimeUnit.HOURS,
>>> 1)
>>> .print();
>>> ```
>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>> print.
>>>
>>> I guess the users have no means to solve this problem, we might optimize
>>> this later.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>>>
 Hey all!

 I encountered a strange and unexpected behaviour when trying to use
 unaligned checkpoints with AsyncIO.

 If the async operation queue is full and backpressures the pipeline
 completely, then unaligned checkpoints cannot be completed. To me this
 sounds counterintuitive because one of the benefits of the AsyncIO would be
 that we can simply checkpoint the queue and not have to wait for the
 completion.

 To repro you can simply run:

 AsyncDataStream.orderedWait(
 env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
 new AsyncFunction() {
 @Override
 public void asyncInvoke(Long aLong, ResultFuture
 resultFuture) {}
 },
 24,
 TimeUnit.HOURS,
 1)
 .print();

 This pipeline will completely backpressure the source and checkpoints
 do not progress even though they are unaligned. Already the source cannot
 take a ch