Re: [DISCUSS] FLIP-443: Interruptible watermark processing

2024-05-05 Thread Zakelly Lan
Hi Piotr,

Thanks for the improvement, overall +1 for this. I'd leave a minor comment:

1. I'd suggest also providing `isInterruptable()` in `Mail`, and the
continuation mail will return true. The FLIP-425 will leverage this queue
to execute some state requests, and when the cp arrives, the operator may
call `yield()` to drain. It may happen that the continuation mail is called
again in `yield()`. By checking `isInterruptable()`, we can skip this mail
and re-enqueue.


Best,
Zakelly

On Wed, May 1, 2024 at 4:35 PM Yanfei Lei  wrote:

> Thanks for your answers, Piotrek. I got it now.  +1 for this improvement.
>
> Best,
> Yanfei
>
> Stefan Richter  于2024年4月30日周二 21:30写道:
> >
> >
> > Thanks for the improvement proposal, I’m +1 for the change!
> >
> > Best,
> > Stefan
> >
> >
> >
> > > On 30. Apr 2024, at 15:23, Roman Khachatryan  wrote:
> > >
> > > Thanks for the proposal, I definitely see the need for this
> improvement, +1.
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski  > wrote:
> > >
> > >> Hi Yanfei,
> > >>
> > >> Thanks for the feedback!
> > >>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>
> > >> The watermark would be outputted by an operator only once all relevant
> > >> timers are fired.
> > >> In other words, if firing of timers is interrupted a continuation
> mail to
> > >> continue firing those
> > >> interrupted timers is created. Watermark will be emitted downstream
> at the
> > >> end of that
> > >> continuation mail.
> > >>
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>
> > >> Yes, both firing processing and even time timers share the same code
> and
> > >> both will
> > >> support interruptions in the same way. Actually I've renamed the FLIP
> from
> > >>
> > >>> Interruptible watermarks processing
> > >>
> > >> to:
> > >>
> > >>> Interruptible timers firing
> > >>
> > >> to make this more clear.
> > >>
> > >> Best,
> > >> Piotrek
> > >>
> > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei  napisał(a):
> > >>
> > >>> Hi Piotrek,
> > >>>
> > >>> Thanks for this proposal. It looks like it will shorten the
> checkpoint
> > >>> duration, especially in the case of back pressure. +1 for it!  I'd
> > >>> like to ask some questions to understand your thoughts more
> precisely.
> > >>>
> > >>> 1. Currently when AbstractStreamOperator or AbstractStreamOperatorV2
> > >>> processes a watermark, the watermark will be sent to downstream, if
> > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, when
> > >>> is the watermark sent downstream?
> > >>> 2. IIUC, processing-timer's firing is also encapsulated into mail and
> > >>> executed in mailbox. Is processing-timer allowed to be interrupted?
> > >>>
> > >>> Best regards,
> > >>> Yanfei
> > >>>
> > >>> Piotr Nowojski  于2024年4月29日周一 21:57写道:
> > >>>
> > 
> >  Hi all,
> > 
> >  I would like to start a discussion on FLIP-443: Interruptible
> watermark
> >  processing.
> > 
> > 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ=gmail-imap=171508837000=AOvVaw0eTZDvLwdZUDai5GqoSGrD
> > 
> >  This proposal tries to make Flink's subtask thread more responsive
> when
> >  processing watermarks/firing timers, and make those operations
> >  interruptible/break them apart into smaller steps. At the same time,
> > >> the
> >  proposed solution could be potentially adopted in other places in
> the
> > >>> code
> >  base as well, to solve similar problems with other flatMap-like
> > >> operators
> >  (non windowed joins, aggregations, CepOperator, ...).
> > 
> >  I'm looking forward to your thoughts.
> > 
> >  Best,
> >  Piotrek
> >
>


Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread Yuan Mei
+1(binding)

Best
Yuan

On Mon, May 6, 2024 at 11:28 AM Rui Fan <1996fan...@gmail.com> wrote:

> +1 (binding)
>
> Best,
> Rui
>
> On Mon, May 6, 2024 at 11:01 AM Yanfei Lei  wrote:
>
> > +1 (binding)
> >
> > Best,
> > Yanfei
> >
> > Zakelly Lan  于2024年5月6日周一 11:00写道:
> > >
> > > +1 (binding)
> > >
> > > Thanks for driving this!
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback, I'd like to start a vote on the
> FLIP-447:
> > > > Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is
> > here
> > > > [2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> > objection or
> > > > insufficient votes.
> > > >
> > > > [1]
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> > > > [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
> > > >
> > > > --
> > > > Best,
> > > > Yue
> > > >
> >
>


RE: Re: [VOTE] Apache Flink CDC Release 3.1.0, release candidate #1

2024-05-05 Thread Xiqian YU
Hi Qingsheng,



Thanks! +1 (non-binding)



· Compiled & Ran unit tests successfully

· Ran ASF Incompatible license check

· Tested MySQL -> Doris / StarRocks E2e cases

· Tested transform projection & filter feature



Regards,

Xiqian Yu

On 2024/05/04 11:45:28 Ahmed Hamdy wrote:
> Hi Qisheng,
>
> +1 (non-binding)
>
> - Verified checksums and hashes
> - Verified signatures
> - Verified github tag exists
> - Verified no binaries in source
> - build source
>
>
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 3 May 2024 at 23:03, Jeyhun Karimov 
> mailto:je...@gmail.com>> wrote:
>
> > Hi Qinsheng,
> >
> > Thanks for driving the release.
> > +1 (non-binding)
> >
> > - No binaries in source
> > - Verified Signatures
> > - Github tag exists
> > - Build source
> >
> > Regards,
> > Jeyhun
> >
> > On Thu, May 2, 2024 at 10:52 PM Muhammet Orazov
> > mailto:mo...@morazow.com.inva>lid> wrote:
> >
> > > Hey Qingsheng,
> > >
> > > Thanks a lot! +1 (non-binding)
> > >
> > > - Checked sha512sum hash
> > > - Checked GPG signature
> > > - Reviewed release notes
> > > - Reviewed GitHub web pr (added minor suggestions)
> > > - Built the source with JDK 11 & 8
> > > - Checked that src doesn't contain binary files
> > >
> > > Best,
> > > Muhammet
> > >
> > > On 2024-04-30 05:11, Qingsheng Ren wrote:
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for the version
> > > > 3.1.0 of
> > > > Apache Flink CDC, as follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > > **Release Overview**
> > > >
> > > > As an overview, the release consists of the following:
> > > > a) Flink CDC source release to be deployed to dist.apache.org
> > > > b) Maven artifacts to be deployed to the Maven Central Repository
> > > >
> > > > **Staging Areas to Review**
> > > >
> > > > The staging areas containing the above mentioned artifacts are as
> > > > follows,
> > > > for your review:
> > > > * All artifacts for a) can be found in the corresponding dev repository
> > > > at
> > > > dist.apache.org [1], which are signed with the key with fingerprint
> > > > A1BD477F79D036D2C30CA7DBCA8AEEC2F6EB040B [2]
> > > > * All artifacts for b) can be found at the Apache Nexus Repository [3]
> > > >
> > > > Other links for your review:
> > > > * JIRA release notes [4]
> > > > * Source code tag "release-3.1.0-rc1" with commit hash
> > > > 63b42cb937d481f558209ab3c8547959cf039643 [5]
> > > > * PR for release announcement blog post of Flink CDC 3.1.0 in flink-web
> > > > [6]
> > > >
> > > > **Vote Duration**
> > > >
> > > > The voting time will run for at least 72 hours, adopted by majority
> > > > approval with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Qingsheng Ren
> > > >
> > > > [1] https://dist.apache.org/repos/dist/dev/flink/flink-cdc-3.1.0-rc1/
> > > > [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > [3]
> > > > https://repository.apache.org/content/repositories/orgapacheflink-1731
> > > > [4]
> > > >
> > >
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
> > > > [5] https://github.com/apache/flink-cdc/releases/tag/release-3.1.0-rc1
> > > > [6] https://github.com/apache/flink-web/pull/739
> > >
> >
>


Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread Rui Fan
+1 (binding)

Best,
Rui

On Mon, May 6, 2024 at 11:01 AM Yanfei Lei  wrote:

> +1 (binding)
>
> Best,
> Yanfei
>
> Zakelly Lan  于2024年5月6日周一 11:00写道:
> >
> > +1 (binding)
> >
> > Thanks for driving this!
> >
> >
> > Best,
> > Zakelly
> >
> > On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:
> >
> > > Hi everyone,
> > >
> > > Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
> > > Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is
> here
> > > [2].
> > >
> > > The vote will be open for at least 72 hours unless there is an
> objection or
> > > insufficient votes.
> > >
> > > [1]
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> > > [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
> > >
> > > --
> > > Best,
> > > Yue
> > >
>


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-05 Thread Rui Fan
Thanks Xuannan for driving this proposal!

> taskmanager.network.memory.max-overdraft-buffers-per-gate will be removed
and hard-coded to either 10 or 20.

Currently, it's a public option. Could we determine the value of
the overdraft buffer in the current FLIP?

I vote 20 as the hard code value due to 2 reasons:
- Removing this option means users cannot change it, it might be better to
turn it up.
- Most of tasks don't use the overdraft buffer, so increasing it doesn't
introduce more risk.

Best,
Rui

On Mon, May 6, 2024 at 10:47 AM Yuxin Tan  wrote:

> Thanks for the effort, Xuannan.
>
> +1 for the proposal.
>
> Best,
> Yuxin
>
>
> Xintong Song  于2024年4月29日周一 15:40写道:
>
> > Thanks for driving this effort, Xuannan.
> >
> > +1 for the proposed changes.
> >
> > Just one suggestion: Some of the proposed changes involve not solely
> > changing the configuration options, but are bound to changing / removal
> of
> > certain features. E.g., the removal of hash-blocking shuffle and legacy
> > hybrid shuffle mode, and the behavior change of overdraft network
> buffers.
> > Therefore, it might be nicer to provide an implementation plan with a
> list
> > of related tasks in the FLIP. This should not block the FLIP though.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion on FLIP-450: Improve Runtime
> > > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we have
> > > revisited all runtime configurations and identified several
> > > improvements to enhance user-friendliness and maintainability. In this
> > > FLIP, we aim to refine the runtime configuration.
> > >
> > > Looking forward to everyone's feedback and suggestions. Thank you!
> > >
> > > Best regards,
> > > Xuannan
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> > >
> >
>


Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread Yanfei Lei
+1 (binding)

Best,
Yanfei

Zakelly Lan  于2024年5月6日周一 11:00写道:
>
> +1 (binding)
>
> Thanks for driving this!
>
>
> Best,
> Zakelly
>
> On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
> > Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is here
> > [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection or
> > insufficient votes.
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> > [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
> >
> > --
> > Best,
> > Yue
> >


Re: [VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread Zakelly Lan
+1 (binding)

Thanks for driving this!


Best,
Zakelly

On Mon, May 6, 2024 at 10:54 AM yue ma  wrote:

> Hi everyone,
>
> Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
> Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is here
> [2].
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
> [2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw
>
> --
> Best,
> Yue
>


Re: [DISCUSS] FLIP-444: Native file copy support

2024-05-05 Thread Zakelly Lan
Hi Piotr,

Thanks for the proposal. It's meaningful to speed up the state download. I
get into some questions:

1. What is the semantic of `canCopyPath`? Should it be associated with a
specific destination path? e.g. It can be copied to local, but not to the
remote FS.
2. Is the existing interface `DuplicatingFileSystem` feasible/enough for
this case?
3. Will the interface extracting introduce a break change?


Best,
Zakelly


On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko  wrote:

> Hi Piotr,
>
> Thanks for the proposal.
> How adding a s5cmd will affect memory footprint? Since this is a native
> binary, memory consumption will not be controlled by JVM or Flink.
>
> Thanks,
> Aleksandr
>
> On Thu, 2 May 2024 at 11:12, Hong Liang  wrote:
>
> > Hi Piotr,
> >
> > Thanks for the FLIP! Nice to see work to improve the filesystem
> > performance. +1 to future work to improve the upload speed as well. This
> > would be useful for jobs with large state and high Async checkpointing
> > times.
> >
> > Some thoughts on the configuration, it might be good for us to introduce
> 2x
> > points of configurability for future proofing:
> > 1/ Configure the implementation of PathsCopyingFileSystem used, maybe by
> > config, or by ServiceResources (this would allow us to use this for
> > alternative clouds/Implement S3 SDKv2 support if we want this in the
> > future). Also this could be used as a feature flag to determine if we
> > should be using this new native file copy support.
> > 2/ Configure the location of the s5cmd binary (version control etc.), as
> > you have mentioned in the FLIP.
> >
> > Regards,
> > Hong
> >
> >
> > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> >  wrote:
> >
> > > Hey Piotr,
> > >
> > > Thanks for the proposal! It would be great improvement!
> > >
> > > Some questions from my side:
> > >
> > > > In order to configure s5cmd Flink’s user would need
> > > > to specify path to the s5cmd binary.
> > >
> > > Could you please also add the configuration property
> > > for this? An example showing how users would set this
> > > parameter would be helpful.
> > >
> > > Would this affect any filesystem connectors that use
> > > FileSystem[1][2] dependencies?
> > >
> > > Best,
> > > Muhammet
> > >
> > > [1]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > [2]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > >
> > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > Hi all!
> > > >
> > > > I would like to put under discussion:
> > > >
> > > > FLIP-444: Native file copy support
> > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > >
> > > > This proposal aims to speed up Flink recovery times, by speeding up
> > > > state
> > > > download times. However in the future, the same mechanism could be
> also
> > > > used to speed up state uploading (checkpointing/savepointing).
> > > >
> > > > I'm curious to hear your thoughts.
> > > >
> > > > Best,
> > > > Piotrek
> > >
> >
>


[VOTE] FLIP-447: Upgrade FRocksDB from 6.20.3 to 8.10.0

2024-05-05 Thread yue ma
Hi everyone,

Thanks for all the feedback, I'd like to start a vote on the FLIP-447:
Upgrade FRocksDB from 6.20.3 to 8.10.0 [1]. The discussion thread is here
[2].

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-447%3A+Upgrade+FRocksDB+from+6.20.3++to+8.10.0
[2] https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw

-- 
Best,
Yue


Re: [DISCUSSION] FLIP-450: Improve Runtime Configuration for Flink 2.0

2024-05-05 Thread Yuxin Tan
Thanks for the effort, Xuannan.

+1 for the proposal.

Best,
Yuxin


Xintong Song  于2024年4月29日周一 15:40写道:

> Thanks for driving this effort, Xuannan.
>
> +1 for the proposed changes.
>
> Just one suggestion: Some of the proposed changes involve not solely
> changing the configuration options, but are bound to changing / removal of
> certain features. E.g., the removal of hash-blocking shuffle and legacy
> hybrid shuffle mode, and the behavior change of overdraft network buffers.
> Therefore, it might be nicer to provide an implementation plan with a list
> of related tasks in the FLIP. This should not block the FLIP though.
>
> Best,
>
> Xintong
>
>
>
> On Thu, Apr 25, 2024 at 4:35 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-450: Improve Runtime
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we have
> > revisited all runtime configurations and identified several
> > improvements to enhance user-friendliness and maintainability. In this
> > FLIP, we aim to refine the runtime configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0
> >
>


[RESULT][VOTE] FLIP-445: Support dynamic parallelism inference for HiveSource

2024-05-05 Thread Xia Sun
Hi all,

FLIP-445: Support dynamic parallelism inference for HiveSource[1] has been
accepted and voted through this thread [2].

The proposal has been accepted with 6 approving votes (5 binding) and there
is no disapproval:

- Muhammet Orazov (non-binding)
- Rui Fan (binding)
- Ron Liu (binding)
- Zhu Zhu (binding)
- Lijie Wang (binding)
- yuxia (binding)

Thanks to all involved.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-445%3A+Support+dynamic+parallelism+inference+for+HiveSource
[2] https://lists.apache.org/thread/lktnb162l2z3042m76to6xfbsdndy4r7

Best,
Xia


Re: Flink stateful functions and Agentic Architecture

2024-05-05 Thread Yunfeng Zhou
Hi David,

I'm not very familiar with stateful functions, but I participated in
Flink ML, a machine learning infrastructure and algorithm library
based on Flink. There we developed functions like iteration based
model-training process and hot updating ML model during online
prediction. You may check if these functions and their corresponding
designs could be of your interest.
https://nightlies.apache.org/flink/flink-ml-docs-release-2.3/docs/development/iteration/
https://github.com/apache/flink-ml

Best,
Yunfeng

On Mon, Apr 29, 2024 at 9:35 PM David Carroll  wrote:
>
> I am a systems architect developing a POC concept for an AI product using
> Agentic Architecture with generative LLMs. It occurred to me that it could
> be possible to use Flink stateful functions to provide the event driven
> communications and execution environment for agents built with a framework
> like langchain or langgraph. Such a system could be multi-tenant and fully
> scalable.
>
> I wonder if anyone has explored using Flink stateful functions i this way
> of has suggestion of examples to look at for similar use cases?
> --
> David Carroll
> Chief of Research & Development
> 
> da...@spotter.la  | 310.569.5103
> 
> From TIME. © 2023 TIME USA LLC. All rights reserved. Used under license.
>
> IMPORTANT: The contents of this email and any attachments are confidential.
> They are intended for the named recipient(s) only. If you have received
> this email by mistake, please notify the sender immediately and do not
> disclose the contents to anyone or make copies thereof.


[jira] [Created] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme

2024-05-05 Thread LiuZeshan (Jira)
LiuZeshan created FLINK-35291:
-

 Summary: Improve the ROW data deserialization performance of 
DebeziumEventDeserializationScheme
 Key: FLINK-35291
 URL: https://issues.apache.org/jira/browse/FLINK-35291
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 1.20.0
Reporter: LiuZeshan
 Fix For: 1.20.0
 Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, 
image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png

We are doing performance testing on Flink cdc 3.0 and found through the arthas 
profile that there is a significant performance bottleneck in the serialization 
of row data. The main problem lies in the String. format in the 
BinaryRecordDataGenerator class, so we have made simple performance 
optimizations.

test environment:
 * flink: 1.20-SNAPSHOT master
 * flink-cdc: 3.2-SNAPSHOT master
 * 1CU minicluster mode

{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3308
  username: root
  password: 123456
  tables: test.user_behavior
  server-id: 5400-5404
  #server-time-zone: UTC
  scan.startup.mode: earliest-offset
  debezium.poll.interval.ms: 10

sink:
  type: values
  name: Values Sink
  materialized.in.memory: false
  print.enabled: false

pipeline:
  name: Sync MySQL Database to Values
  parallelism: 1{code}
 

*before optimization: 3.5w/s* 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179!

[^cdc-3.0-1c.html]

^Analyzing the flame chart, it can be found that approximately 24.45% of the 
time is spent on string.format.^

!image-2024-05-06-00-29-34-618.png|width=583,height=171!

 

*after optimization: 5w/s* 

!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174!
 
 [^cdc-3.0-1c-2.html]

After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the 
time is still spent on 
org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. 
Perhaps we can further optimize it.

!image-2024-05-06-00-37-16-028.png|width=379,height=107!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode

2024-05-05 Thread Wouter Zorgdrager (Jira)
Wouter Zorgdrager created FLINK-35290:
-

 Summary: Wrong Instant type conversion TableAPI to Datastream in 
thread mode
 Key: FLINK-35290
 URL: https://issues.apache.org/jira/browse/FLINK-35290
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.1
Reporter: Wouter Zorgdrager


In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a 
Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm 
wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps 
to a Python `datetime`. Can't the same be done for the DatastreamAPI? 
Nevertheless, if we switch from `process` to `thread` mode for execution, the 
`TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a 
`java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I 
only use the DatastreamAPI  and read `Types.Instant()` directly, the conversion 
in both `thread` and `process` mode seem to work just fine.

Below a minimal example exposing the bug:

```
EXECUTION_MODE = "thread"  # or "process"
config = Configuration()
config.set_string("python.execution-mode", EXECUTION_MODE)

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("python.fn-execution.bundle.size", "1")
t_env.get_config().set("python.execution-mode", EXECUTION_MODE)


def to_epoch_ms(row: Row):
print(type(row[1]))
return row[1].to_epoch_milli()


t_env.to_data_stream(
t_env.from_elements(
[
(1, datetime(year=2024, day=10, month=9, hour=9)),
(2, datetime(year=2024, day=10, month=9, hour=12)),
(3, datetime(year=2024, day=22, month=11, hour=12)),
],
DataTypes.ROW(
[
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
]
),
)
).map(to_epoch_ms, output_type=Types.LONG()).print()
env.execute()
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)