Re: Flink sink data to DB and then commit data to Kafka
Hi, Qihua AFAIK there is no way to do it. Maybe you need to implement a "new" sink to archive this target. Best, Guowei On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: > Hi, > > Our flink application has two sinks(DB and kafka topic). We want to push > same data to both sinks. Is it possible to push data to kafka topic only > after data is pushed to DB successfully? If the commit to DB fail, we don't > want those data is pushed to kafka. > > Thanks, > Qihua >
Re: Flink sink data to DB and then commit data to Kafka
An alternative is to use a CDC tool like Debezium to stream your table changes, and then ingest that stream using Flink to push data later to Kafka. On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: > Hi, Qihua > > AFAIK there is no way to do it. Maybe you need to implement a "new" sink > to archive this target. > > Best, > Guowei > > > On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: > >> Hi, >> >> Our flink application has two sinks(DB and kafka topic). We want to push >> same data to both sinks. Is it possible to push data to kafka topic only >> after data is pushed to DB successfully? If the commit to DB fail, we don't >> want those data is pushed to kafka. >> >> Thanks, >> Qihua >> >
Re: Flink sink data to DB and then commit data to Kafka
Hello Qihua, If you do not care with the events that are not committed to DB, you can use Async I/O [1] and implement a logic that - does the database inserts - completes the original events that are only accepted by DB You can then sink this new datastream to kafka. If you are also interested in the events that are not committed to DB, you can use a Process Function [2] and implement a logic that - does the database inserts - collects the original events that are only accepted by DB - sends the ones that are not accepted by DB to a side output You can then sink this new datastream to kafka and maybe sideoutput to another topic. Sincerely, Ali Bahadir Zeybek [1]: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio [2]: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani wrote: > An alternative is to use a CDC tool like Debezium to stream your table > changes, and then ingest that stream using Flink to push data later to > Kafka. > > On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: > >> Hi, Qihua >> >> AFAIK there is no way to do it. Maybe you need to implement a "new" sink >> to archive this target. >> >> Best, >> Guowei >> >> >> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: >> >>> Hi, >>> >>> Our flink application has two sinks(DB and kafka topic). We want to push >>> same data to both sinks. Is it possible to push data to kafka topic only >>> after data is pushed to DB successfully? If the commit to DB fail, we don't >>> want those data is pushed to kafka. >>> >>> Thanks, >>> Qihua >>> >>
Re: Flink sink data to DB and then commit data to Kafka
Many thanks guys! Hi Ali, for approach 2, what is the better way to do the database inserts for this case? Currently we simply use JDBC SQL connector to sink to database. Thanks, Qihua On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek wrote: > Hello Qihua, > > If you do not care with the events that are not committed to DB, > you can use Async I/O [1] and implement a logic that > >- does the database inserts >- completes the original events that are only accepted by DB > > You can then sink this new datastream to kafka. > > If you are also interested in the events that are not committed to DB, > you can use a Process Function [2] and implement a logic that > >- does the database inserts >- collects the original events that are only accepted by DB >- sends the ones that are not accepted by DB to a side output > > You can then sink this new datastream to kafka and maybe sideoutput to > another topic. > > Sincerely, > > Ali Bahadir Zeybek > > [1]: > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio > [2]: > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function > > On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < > france...@ververica.com> wrote: > >> An alternative is to use a CDC tool like Debezium to stream your table >> changes, and then ingest that stream using Flink to push data later to >> Kafka. >> >> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: >> >>> Hi, Qihua >>> >>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink >>> to archive this target. >>> >>> Best, >>> Guowei >>> >>> >>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: >>> Hi, Our flink application has two sinks(DB and kafka topic). We want to push same data to both sinks. Is it possible to push data to kafka topic only after data is pushed to DB successfully? If the commit to DB fail, we don't want those data is pushed to kafka. Thanks, Qihua >>>
Re: Flink sink data to DB and then commit data to Kafka
Hello Qihua, This will require you to implement and maintain your own database insertion logic using any of the clients that your database and programming language supports. Bear in mind that you will be losing all the optimizations Flink's connector provides for you and this will add complexity to the amount of the code you will have to maintain. On the other hand it will handle the case within one job. If you have more control on the things you can do with your database, and the latency to kafka is not a major issue since there will be more moving parts, then what @Francesco Guardiani suggested is also a good approach. You will need to maintain more systems, i.e. Debezium, but less custom code. Therefore, it is mostly up to your requirements and available resources you have on how to proceed. Sincerely, Ali Bahadir Zeybek On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang wrote: > Many thanks guys! > Hi Ali, for approach 2, what is the better way to do the database inserts > for this case? Currently we simply use JDBC SQL connector to sink to > database. > > Thanks, > Qihua > > On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek > wrote: > >> Hello Qihua, >> >> If you do not care with the events that are not committed to DB, >> you can use Async I/O [1] and implement a logic that >> >>- does the database inserts >>- completes the original events that are only accepted by DB >> >> You can then sink this new datastream to kafka. >> >> If you are also interested in the events that are not committed to DB, >> you can use a Process Function [2] and implement a logic that >> >>- does the database inserts >>- collects the original events that are only accepted by DB >>- sends the ones that are not accepted by DB to a side output >> >> You can then sink this new datastream to kafka and maybe sideoutput to >> another topic. >> >> Sincerely, >> >> Ali Bahadir Zeybek >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio >> [2]: >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function >> >> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < >> france...@ververica.com> wrote: >> >>> An alternative is to use a CDC tool like Debezium to stream your table >>> changes, and then ingest that stream using Flink to push data later to >>> Kafka. >>> >>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: >>> Hi, Qihua AFAIK there is no way to do it. Maybe you need to implement a "new" sink to archive this target. Best, Guowei On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: > Hi, > > Our flink application has two sinks(DB and kafka topic). We want to > push same data to both sinks. Is it possible to push data to kafka topic > only after data is pushed to DB successfully? If the commit to DB fail, we > don't want those data is pushed to kafka. > > Thanks, > Qihua >
Re: Flink sink data to DB and then commit data to Kafka
Hi Ali, Thank you so much! That is very helpful. Thanks, Qihua On Wed, Nov 3, 2021 at 2:46 PM Ali Bahadir Zeybek wrote: > Hello Qihua, > > This will require you to implement and maintain your own database insertion > logic using any of the clients that your database and programming language > supports. Bear in mind that you will be losing all the optimizations > Flink's connector > provides for you and this will add complexity to the amount of the code > you will have to maintain. On the other hand it will handle the case > within one job. > > If you have more control on the things you can do with your database, and > the > latency to kafka is not a major issue since there will be more moving > parts, then > what @Francesco Guardiani suggested is also a > good approach. You will need > to maintain more systems, i.e. Debezium, but less custom code. > > Therefore, it is mostly up to your requirements and available resources > you have > on how to proceed. > > Sincerely, > > Ali Bahadir Zeybek > > > > > > On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang wrote: > >> Many thanks guys! >> Hi Ali, for approach 2, what is the better way to do the database inserts >> for this case? Currently we simply use JDBC SQL connector to sink to >> database. >> >> Thanks, >> Qihua >> >> On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek >> wrote: >> >>> Hello Qihua, >>> >>> If you do not care with the events that are not committed to DB, >>> you can use Async I/O [1] and implement a logic that >>> >>>- does the database inserts >>>- completes the original events that are only accepted by DB >>> >>> You can then sink this new datastream to kafka. >>> >>> If you are also interested in the events that are not committed to DB, >>> you can use a Process Function [2] and implement a logic that >>> >>>- does the database inserts >>>- collects the original events that are only accepted by DB >>>- sends the ones that are not accepted by DB to a side output >>> >>> You can then sink this new datastream to kafka and maybe sideoutput to >>> another topic. >>> >>> Sincerely, >>> >>> Ali Bahadir Zeybek >>> >>> [1]: >>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio >>> [2]: >>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function >>> >>> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < >>> france...@ververica.com> wrote: >>> An alternative is to use a CDC tool like Debezium to stream your table changes, and then ingest that stream using Flink to push data later to Kafka. On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: > Hi, Qihua > > AFAIK there is no way to do it. Maybe you need to implement a "new" > sink to archive this target. > > Best, > Guowei > > > On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: > >> Hi, >> >> Our flink application has two sinks(DB and kafka topic). We want to >> push same data to both sinks. Is it possible to push data to kafka topic >> only after data is pushed to DB successfully? If the commit to DB fail, >> we >> don't want those data is pushed to kafka. >> >> Thanks, >> Qihua >> >