Synchronizing streams in coprocessfunction

2022-06-26 Thread Gopi Krishna M
Hi,
I've a scenario where I use connected streams where one is a low throughput
metadata stream and another one is a high throughput data stream. I use
CoProcessFunction that operates on a data stream with behavior controlled
by a metadata stream.

Is there a way to slow down/pause the high throughput data stream until
I've received one entry from the metadata stream? It's possible that by the
time I get the first element from the metadata stream, I might get 1000s of
items from the data stream. One option is to create a state to buffer the
data stream within the operator. Is there any other option which doesn't
need this state management?

Thanks,
Gopi


Re: [ANNOUNCE] Apache Flink 1.14.5 released

2022-06-26 Thread Qingsheng Ren
Thanks Xingbo for driving this release!

Best, 
Qingsheng

> On Jun 22, 2022, at 11:50, Xingbo Huang  wrote:
> 
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.14.5, which is the fourth bugfix release for the Apache Flink 1.14 
> series.
> 
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2022/06/22/release-1.14.5.html
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351388
>  
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
>  
> Regards,
> Xingbo



Re: flink1.15中kafka source、sink对kafka-client的版本要求是否可降低

2022-06-26 Thread Qingsheng Ren
Hi,

目前 Kafka connector 会依赖于高版本 kafka-clients 的一些 API,而且 sink 端为支持 exactly-once 
语义使用了反射。Flink 社区考虑到 Kafka client 本身保证了较好后向兼容性,因此不再提供使用旧版本 client 的 Kafka 
connector,针对 5 年前发布的 Kafka 0.11 版本进行适配也不太现实。

祝好,
庆盛

> On Jun 23, 2022, at 19:37, yidan zhao  wrote:
> 
> 如题,我想问下相关了解的同学,目前只是升级 kafka-client 新版本,换了下接口用法。还是依赖到部分新版本client才有的功能呢?
> 是否有可能基于低版本 kafka-client 实现呢?
> 
> 可以的话我可能自己覆盖实现下。
> 因为高版本kafka-client不支持公司的kafka,公司kafka是开源kafka外层加了一层proxy。使用太高版本kafka
> client访问会有问题(推荐的是0.11,我测试最多到2.2的client)。



Re: Flink k8s Operator on AWS?

2022-06-26 Thread Yang Wang
Could you please share the JobManager logs of failed deployment? It will
also help a lot if you could show the pending pod status via "kubectl
describe ".

Given that the current Flink Kubernetes Operator is built on top of native
K8s integration[1], the Flink ResourceManager should allocate enough
TaskManager pods automatically.
We need to find out what is wrong via the logs. Maybe the service account
or taint or something else.


[1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html


Best,
Yang

Matt Casters  于2022年6月24日周五 23:48写道:

> Yes of-course.  I already feel a bit less intelligent for having asked the
> question ;-)
>
> The status now is that I managed to have it all puzzled together.  Copying
> the files from s3 to an ephemeral volume takes all of 2 seconds so it's
> really not an issue.  The cluster starts and our fat jar and Apache Hop
> MainBeam class is found and started.
>
> The only thing that remains is figuring out how to configure the Flink
> cluster itself.  I have a couple of m5.large ec2 instances in a node group
> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
> in the pipeline can't seem to find resources to start.
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
>
> Parallelism was set to 1 for the runner and there are only 2 tasks in my
> first Beam pipeline so it should be simple enough but it just times out.
>
> Next step for me is to document the result which will end up on
> hop.apache.org.   I'll probably also want to demo this in Austin at the
> upcoming Beam summit.
>
> Thanks a lot for your time and help so far!
>
> Cheers,
> Matt
>
>


Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-26 Thread Dian Fu
Hi John,

This seems like a bug and I have created a ticket
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to
see if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper  wrote:

> Hi,
>
> I have a source table using a Kinesis connector reading events from AWS
> EventBridge using PyFlink 1.15.0. An example of the sorts of data that are
> in this stream is here:
> https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
> Note that the stream of data contains many different types of events, where
> the 'detail' field is completely different between different event types.
> There is no support for this connector using PyFlink DataStream API, so I
> use the Table API to construct the source table.  The table looks like this:
>
>
> CREATE TABLE events (
>  `id` VARCHAR,
>  `source` VARCHAR,
>  `account` VARCHAR,
>  `region` VARCHAR,
>  `detail-type` VARCHAR,
>  `detail` VARCHAR,
>  `source` VARCHAR,
>  `resources` VARCHAR,
>  `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
>  WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
>  PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> ...
> )
>
>
>
> The table was created using:
>
>  table_env.execute_sql(CREATE_STRING_ABOVE)
>
> I'd like to turn this table into a data stream so I can perform some
> processing that is easier to do in the DataStream API:
>
>
> events_stream_table = table_env.from_path('events')
>
> events_stream = table_env.to_data_stream(events_stream_table)
>
> # now do some processing - let's filter by the type of event we get
>
> codebuild_stream = events_stream.filter(
> lambda event: event['source'] == 'aws.codebuild'
> )
>
> # now do other stuff on a stream containing only events that are identical
> in shape
> ...
> # maybe convert back into a Table and perform SQL on the data
>
>
> When I run this, I get an exception:
>
>
> org.apache.flink.table.api.TableException: Unsupported conversion from data 
> type
>
>  'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to
>
> type information. Only data types that originated from type information fully
>
> support a reverse conversion.
>
>
> Somebody reported a similar error here (
> https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
> When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL
> TIME ZONE" with a "TIMESTAMP(3)" I get a different exception:
>
> TypeError: The java type info: LocalDateTime is not supported in PyFlink
> currently.
>
>
> Is there a way of converting this Table into a DataStream (and then back
> again)? I need to use the data in the "time"​ field as the source of
> watermarks for my events.
>
> Many thanks,
>
> John
>


Re: Flink消费kafka实时同步到MongoDB出现丢数据

2022-06-26 Thread Qingsheng Ren
Hi,

Flink Kafka connector 会在 checkpoint 完成后将位点提交至 Kafka broker,但是 Flink 并不会依赖于提交到 
Kafka broker 上的位点做故障恢复,而是使用 checkpoint 中存储的位点恢复。

关于丢失数据个人建议可以先从小数据量开始复现问题,然后从 source 至 sink 再排查。

祝好,
庆盛

> On Jun 26, 2022, at 11:54, casel.chen  wrote:
> 
> mysql cdc -> kafka -> mongodb
> 写了一个flink 
> 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git
> 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。
> 
> 
> flink消费kafka处理数据后提交kafka 
> offset的流程是怎样的?一开始消费kafka获取到pendingOffsets,如何确保这些pendingOffsets都处理完成然后全部提交呢?有没有这块源码解析资料?
> 



Re: Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 Thread Qingsheng Ren
Hi,

这个是 Apache Kafka consumer 的一个已知问题,参见 FLINK-28060 [1] 和 KAFKA-13840 [2]。

[1] https://issues.apache.org/jira/browse/FLINK-28060
[2] https://issues.apache.org/jira/browse/KAFKA-13840

祝好,
庆盛

> On Jun 27, 2022, at 09:16, RS  wrote:
> 
> Hi,
> 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?
> 
> 
> 现象如下:
> 1. 任务没有异常,
> 2. 数据能正常消费处理,不影响数据使用
> 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
> 4. 部分任务的从Kafka的offset提交失败,部分正常
> 
> 
> WARN日志如下:
> 2022-06-27 01:07:42,725 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11398 (max part counter=1).
> 2022-06-27 01:07:42,830 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11398.
> 2022-06-27 01:07:43,820 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11476 (max part counter=0).
> 2022-06-27 01:07:43,946 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11476.
> 2022-06-27 01:07:45,218 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11521 (max part counter=47).
> 2022-06-27 01:07:45,290 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11521.
> 2022-06-27 01:07:45,521 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11443
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-27 01:07:45,990 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11398
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 
> 
> Thanks~



Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-26 Thread Dian Fu
Hi John,

Kinesis and most of the other connectors will be supported in 1.16, see [1]
for more details about kinesis.

For versions prior to 1.16, you could try just as Andrew suggested or refer
to the implementations which are already available in the master as
examples.

Regards,
Dian

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors/kinesis.py

On Fri, Jun 24, 2022 at 9:20 PM Andrew Otto  wrote:

> I've had success using the Java in pyflink via pyflink.java_gateway.
> Something like:
>
> from pyflink.java_gateway import get_gateway
> jvm = get_gateway()
>
> # then perhaps something like:
> FlinkKinesisConsumer = jvm.
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>
> There also seems to be a nice java_utils.py
> 
>  with helpers that may uh, help.
>
> Not sure if this will work, you might need to use the python env's a java
> StreamTableEnvironment to do it?  Here's an example
> 
> of how the python StreamTableEnvironment calls out to the Java one.
>
> BTW: I'm not an authority nor I have I really tried this, so take this
> advice with a grain of salt!  :)
>
> Good luck!
>
>
>
>
>
>
> On Fri, Jun 24, 2022 at 9:06 AM John Tipper 
> wrote:
>
>> Hi all,
>>
>> There are a number of connectors which do not appear to be in the Python
>> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
>> connectors by using the Table API:
>>
>> CREATE TABLE my_table (...)
>> WITH ('connector' = 'kinesis' ...)
>>
>>
>> I guess if you wanted the stream as a DataStream you'd I guess you'd
>> create the Table and then convert into a DataStream?
>>
>> Is there a way of directly instantiating these connectors in PyFlink
>> without needed to use SQL like this (and without having to wait until
>> v1.16)? e.g. the Java API looks like this:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
>> "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>>
>>
>> Many thanks,
>>
>> John
>>
>


Re: Re: lookup join对应task无法正常恢复?

2022-06-26 Thread xuchao
是的,使用的flink sql内置的jdbc connector。

Thanks! 



Best,
Amber Xu
 
发件人: Lincoln Lee
发送时间: 2022-06-24 21:27
收件人: user-zh
主题: Re: lookup join对应task无法正常恢复?
Hi,
   请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存,
并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了
   如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常
 
Best,
Lincoln Lee
 
 
Xuchao  于2022年6月24日周五 17:15写道:
 
> 您好!
> 我在使用flink时遇到一些问题。
> flink-1.14.4
> sqlserver-cdc-2.2.1
> yarn-per-job
>
> 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
> sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
> 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
>
> 以上,可能是什么问题,应该如何解决呢?
>
> 期待回复!
> best wishes!
>
> 附日志:
> 2022-06-24 14:55:45,950 ERROR
> com.ververica.cdc.debezium.internal.Handover [] - Reporting
> error:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_301]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_301]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> ... 7 more
> 2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,954 INFO
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
> [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
> circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
> (create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id])
> -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> discarding 0 drained requests
> 2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, carflow]], fields=[id, plate_license, site_id,
> create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license,
> site_id, create_time, (create_time + -2880:INTERVAL HOUR) AS c_time,
> flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time],
> watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched
> from RUNNING to FAILED with failure cause:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the 

Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 Thread RS
Hi,
请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?


现象如下:
1. 任务没有异常,
2. 数据能正常消费处理,不影响数据使用
3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
4. 部分任务的从Kafka的offset提交失败,部分正常


WARN日志如下:
2022-06-27 01:07:42,725 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11398 (max part counter=1).
2022-06-27 01:07:42,830 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11398.
2022-06-27 01:07:43,820 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11476 (max part counter=0).
2022-06-27 01:07:43,946 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11476.
2022-06-27 01:07:45,218 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11521 (max part counter=47).
2022-06-27 01:07:45,290 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11521.
2022-06-27 01:07:45,521 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11443
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-06-27 01:07:45,990 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11398
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.


Thanks~

Re: How to make current application cdc

2022-06-26 Thread yuxia
> I mean CDC should be handled on the Kafka side. 
What do you mean about that? Do you mean the the Kafka should store the message 
with the cdc format like debezium[1], Canal[2], MaxWell[3], OGG[4]? 

> Or should I need to use Table API 

I'm afraid not. Seems you can still use Flink Datastream API as Table API makes 
no difference for your case. 

BTW, you can try flink cdc [5] 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/
 ] 
[2] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/
 ] 
[3] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/
 ] 
[4] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/
 ] 
[5] [ https://ververica.github.io/flink-cdc-connectors/ | 
https://ververica.github.io/flink-cdc-connectors/ ] 


Best regards, 
Yuxia 


发件人: "Sid"  
收件人: "User"  
发送时间: 星期六, 2022年 6 月 25日 下午 6:32:22 
主题: How to make current application cdc 

Hello, 

I have a current flow where the data from the Flink-Kafka connector is captured 
and processed using Flink Datastream API and stored in Kafka topics. However, I 
would like to make it CDC enabled. I went through an article where it was 
mentioned that it should be handled on the Kafka side while capturing the data. 
I mean CDC should be handled on the Kafka side. Or should I need to use Table 
API? 
So, any ideas/links are much appreciated as I am trying to understand these 
concepts. 

TIA, 
Sid 



来自杨柳的邮件

2022-06-26 Thread 杨柳
退订

退订

2022-06-26 Thread lian
退订



Re: How to make current application cdc

2022-06-26 Thread Sid
Hi team,

Any help here please?

Thanks,
Sid

On Sat, Jun 25, 2022 at 4:02 PM Sid  wrote:

> Hello,
>
> I have a current flow where the data from the Flink-Kafka connector is
> captured and processed using Flink Datastream API and stored in Kafka
> topics. However, I would like to make it CDC enabled. I went through an
> article where it was mentioned that it should be handled on the Kafka side
> while capturing the data. I mean CDC should be handled on the Kafka side.
> Or should I need to use Table API?
>
> So, any ideas/links are much appreciated as I am trying to understand
> these concepts.
>
> TIA,
> Sid
>