Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation
I thought a little more about your references Martijn and wanted to confirm one thing - the table is specifying the watermark and the downstream view needs to check if it wants all events or only the non-late events. Please let my understanding is correct. Thanks again for your references. Mans On Friday, February 11, 2022, 05:02:49 PM EST, M Singh wrote: Hi Martijn: Thanks for the reference. My understanding was that if we use watermark then any event with event time (in the above example) < event_time - 30 seconds will be dropped automatically. My question [1] is will the downstream (ALL_EVENTS) view which is selecting the events from the table receive events which are late ? If late events are dropped at the table level then do we still need the second predicate check (ts > CURRENT_WATERMARK(ts)) to filter out late events at the view level. If the table does not drop late events, then will all downstream views/etc need to add this check (ts > CURRENT_WATERMARK(ts)) ? I am still not clear on this concept of whether downstream view need to check for late events with this predicate or will they never receive late events. Thanks again for your time. On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser wrote: Hi, There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would cover your questions [1]. Best regards, Martijn [1] https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md On Fri, 11 Feb 2022 at 16:45, M Singh wrote: Hi: The flink docs (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/) indicates that the CURRENT_WATERMARK(rowtime) can return null: Note that this function can return NULL, and you may have to consider this case. For example, if you want to filter out late data you can use: WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) I have the following questions that if the table is defined with a watermark eg: CREATE TABLE `MYEVENTS` (`name` STRING, `event_time` TIMESTAMP_LTZ(3), ...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...) 1. If we define the water mark as above, will the late events still be propagated to a view or table which is selecting from MYEVENTS table: CREATE TEMPORARY VIEW `ALL_EVENTS` AS SELECT * FROM MYEVENTS; 2. Can CURRENT_WATERMARK(event_time) still return null ? If so, what are the conditions for returning null ? Thanks
Re: Queryable State Deprecation
Thank you Frank and Dawid for providing the context here. From: Frank Dekervel Date: Friday, February 4, 2022 at 9:56 AM To: user@flink.apache.org Subject: Re: Queryable State Deprecation EXTERNAL SENDER Hello, To give an extra datapoint: after a not so successful experiment with faust-streaming we moved our application to flink. Since flinks queryable state was apparently stagnant, we implemented what was needed to sink the state to an external data store for querying. However, if queryable state was in good shape we would definately have used it. Making sure that the state is always reflected correctly in our external system turned out to be non-trivial for a number of reasons: our state is not trivially convertable to rows in a table, and sometimes we had (due to our own bugs, but still) inconsistencies between the internal flink state and the externally materialized state, especially after replaying from a checkpoint/savepoint after a crash (we cannot use exactly_once sinks in all occasions). Also, obviously, we could not use flinks partitioning/parallellism to help making state querying more scalable. Greetings, Frank On 04.02.22 14:06, Dawid Wysakowicz wrote: Hi Karthik, The reason we deprecated it is because we lacked committers who could spend time on getting the Queryable state to a production ready state. I might be speaking for myself here, but I think the main use case for the queryable state is to have an insight into the current state of the application for debugging purposes. If it is used for data serving purposes, we believe it's better to sink the data into an external store, which can provide better discoverability and more user friendly APIs for querying the results. As for debugging/tracking insights you may try to achieve similar results with metrics. Best, Dawid On 01/02/2022 16:36, Jatti, Karthik wrote: Hi, I see on the Flink Roadmap that Queryable state API is scheduled to be deprecated but I couldn’t find much information on confluence or this mailing group’s archives to understand the background as to why it’s being deprecated and what would be a an alternative. Any pointers to help me get some more information here would be great. Thanks, Karthik The information in the email message containing a link to this page, including any attachments thereto (collectively, “the e-mail”), is only for use by the intended recipient(s). The e-mail may contain information that is confidential, proprietary and/or privileged. If you have reason to believe that you are not the intended recipient, please notify the sender that you may have received this e-mail in error and delete all copies of it, including attachments, from your computer. Any viewing, copying, disclosure or distribution of this information by an unintended recipient is prohibited and by an intended recipient may be governed by arrangements in place between the sender’s and recipient’s respective firms. Eze Software does not represent that the e-mail is virus-free, complete or accurate. Eze Software accepts no liability for any damage sustained in connection with the content or transmission of the e-mail. Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.
Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation
Hi Martijn: Thanks for the reference. My understanding was that if we use watermark then any event with event time (in the above example) < event_time - 30 seconds will be dropped automatically. My question [1] is will the downstream (ALL_EVENTS) view which is selecting the events from the table receive events which are late ? If late events are dropped at the table level then do we still need the second predicate check (ts > CURRENT_WATERMARK(ts)) to filter out late events at the view level. If the table does not drop late events, then will all downstream views/etc need to add this check (ts > CURRENT_WATERMARK(ts)) ? I am still not clear on this concept of whether downstream view need to check for late events with this predicate or will they never receive late events. Thanks again for your time. On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser wrote: Hi, There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would cover your questions [1]. Best regards, Martijn [1] https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md On Fri, 11 Feb 2022 at 16:45, M Singh wrote: Hi: The flink docs (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/) indicates that the CURRENT_WATERMARK(rowtime) can return null: Note that this function can return NULL, and you may have to consider this case. For example, if you want to filter out late data you can use: WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) I have the following questions that if the table is defined with a watermark eg: CREATE TABLE `MYEVENTS` (`name` STRING, `event_time` TIMESTAMP_LTZ(3), ...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...) 1. If we define the water mark as above, will the late events still be propagated to a view or table which is selecting from MYEVENTS table: CREATE TEMPORARY VIEW `ALL_EVENTS` AS SELECT * FROM MYEVENTS; 2. Can CURRENT_WATERMARK(event_time) still return null ? If so, what are the conditions for returning null ? Thanks
Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation
Hi, There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would cover your questions [1]. Best regards, Martijn [1] https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md On Fri, 11 Feb 2022 at 16:45, M Singh wrote: > Hi: > > The flink docs ( > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/) > indicates that the CURRENT_WATERMARK(rowtime) can return null: > > Note that this function can return NULL, and you may have to consider > this case. For example, if you want to filter out late data you can use: > > WHERE > CURRENT_WATERMARK(ts) IS NULL > OR ts > CURRENT_WATERMARK(ts) > > > I have the following questions that if the table is defined with a > watermark eg: > > CREATE TABLE `MYEVENTS` ( > `name` STRING, > `event_time` TIMESTAMP_LTZ(3), > ... > WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS) > WITH (...) > > > 1. If we define the water mark as above, will the late events still be > propagated to a view or table which is selecting from MYEVENTS table: > > CREATE TEMPORARY VIEW `ALL_EVENTS` AS > SELECT * FROM MYEVENTS; > > 2. Can CURRENT_WATERMARK(event_time) still return null ? If so, what are > the conditions for returning null ? > > > > Thanks > > > >
Re: How to proper hashCode() for keys.
Ok I used the method suggested by Ali. The error is gone. But now I see multiple counts emitted for the same key... DataStream slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source") .uid(kafkaTopic).name(kafkaTopic) .setParallelism(kafkaParallelism) .flatMap(new MapToMyEvent("my-event", windowSizeMins, "message")) <-- Timestamp in GMT created here rounded to the closest minute down. .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new MinutesKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) < Tumbling window of 1 minute. So below you will see a new count was emitted at 16:51 and 16:55 {"countId":"2022-02-11T16:50:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" mydomain.com","uri":"/some-article","count":3542} - {"countId":"2022-02-11T16:51:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" mydomain.com","uri":"/some-article","count":16503} {"countId":"2022-02-11T16:51:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" mydomain.com","uri":"/some-article","count":70} - {"countId":"2022-02-11T16:52:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" mydomain.com","uri":"/some-article","count":16037} {"countId":"2022-02-11T16:53:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" mydomain.com","uri":"/some-article","count":18679} {"countId":"2022-02-11T16:54:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" mydomain.com","uri":"/some-article","count":17697} - {"countId":"2022-02-11T16:55:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" mydomain.com","uri":"/some-article","count":18066} {"countId":"2022-02-11T16:55:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" mydomain.com","uri":"/some-article","count":58} - {"countId":"2022-02-11T16:56:00Z|mydomain.com |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" mydomain.com","uri":"/some-article","count":17489} On Mon, Feb 7, 2022 at 12:44 PM John Smith wrote: > Ok I think Ali's solution makes the most sense to me. I'll try it and let > you know. > > On Mon, Feb 7, 2022 at 11:44 AM Jing Ge wrote: > >> Hi John, >> >> your getKey() implementation shows that it is not deterministic, since >> calling it with the same click instance multiple times will return >> different keys. For example a call at 12:01:59.950 and a call at >> 12:02:00.050 with the same click instance will return two different keys: >> >> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >> >> best regards >> Jing >> >> On Mon, Feb 7, 2022 at 5:07 PM John Smith wrote: >> >>> Maybe there's a misunderstanding. But basically I want to do clickstream >>> count for a given "url" and for simplicity and accuracy of the count base >>> it on processing time (event time doesn't matter as long as I get a total >>> of clicks at that given processing time) >>> >>> So regardless of the event time. I want all clicks for the current >>> processing time rounded to the minute per link. >>> >>> So, if now was 2022-04-07T12:01:00.000Z >>> >>> Then I would want the following result... >>> >>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>> >>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>> And so on... >>> >>> @Override >>> public MyEventCountKey getKey(final MyEvent click) throws Exception >>> { >>> MyEventCountKey key = new MyEventCountKey( >>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>> click.getDomain(), // cnn.com >>> click.getPath(), // /some-article-name >>> ); >>> return key; >>> } >>> >>> >>> >>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek wrote: >>> The key selector works. No it does not ;) It depends on the system time so it's not deterministic (you can get different keys for the very same element). How do you key a count based on the time. I have taken this from > samples online. > This is what the windowing is for. You basically want to group / combine elements per key and event time window [1]. [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ Best, D. On Mon, Feb 7, 2022 at 3:44 PM John Smith wrote: > The key selector works. It only causes an issue if there too many keys
Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation
Hi: The flink docs (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/) indicates that the CURRENT_WATERMARK(rowtime) can return null: Note that this function can return NULL, and you may have to consider this case. For example, if you want to filter out late data you can use: WHERE CURRENT_WATERMARK(ts) IS NULL OR ts > CURRENT_WATERMARK(ts) I have the following questions that if the table is defined with a watermark eg: CREATE TABLE `MYEVENTS` (`name` STRING, `event_time` TIMESTAMP_LTZ(3), ...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...) 1. If we define the water mark as above, will the late events still be propagated to a view or table which is selecting from MYEVENTS table: CREATE TEMPORARY VIEW `ALL_EVENTS` AS SELECT * FROM MYEVENTS; 2. Can CURRENT_WATERMARK(event_time) still return null ? If so, what are the conditions for returning null ? Thanks
table api watermarks, timestamps, outoforderness and head aches
Hi, I am getting a headache when thinking about watermarks and timestamps. My application reads events from Kafka (they are in json format) as a Datastream Events can be keyed by a transactionId and have a event timestamp (handlingTime) All events belonging to a single transactionId will arrive in a window of a couple of minutes (say max 5 minutes). As soon as this 5 minutes has passed it should calculate the differences in timestamp between the ordered events, add that elapsed time to every event and emit them to the Sink. I basically want to use the table api to do "SELECT transactionId, handlingTime, handlingTime - lag(handlingTime) over (partition by transactionId order by handlingTime) as elapsedTime, originalEvent FROM InputTable" After the result of this query has been pushed to the Sink all data with respect to this transactionId can be discarded. What kind of watermark do I need to use? - bounded out of orderness? - with idleness? - ... Late events can be ignored. They will rarely happen. Regards Hans-Peter
Re: CDC using Query
Hi Mohan, I don't know the specifics about the single Kafka Connect worker. The Flink CDC connector is NOT a Kafka Connector. As explained before, there is no Kafka involved when using this connector. As also is mentioned in the same readme, it indeed provides exactly once processing. Best regards, Martijn Op vr 11 feb. 2022 om 13:05 schreef mohan radhakrishnan < radhakrishnan.mo...@gmail.com> > Hello, > Ok. I may not have understood the answer to my previous > question. > When I listen to https://www.youtube.com/watch?v=IOZ2Um6e430 at 20:14 he > starts to talk about this. > Is he talking about a single Kafka Connect worker or a cluster ? He > mentions that it is 'atleast-once'. > So Flink's version is an improvement ? So Flink's Kafka Connector in a > Connect cluster guarantees 'Exactly-once' ? > Please bear with me. > > This will have other consequences too as our MQ may need a MQ connector.( > Probably from Flink or Confluent ) > Different connectors may have different guarantees. > > Thanks. > >> 3. Delivering to kafka from flink is not exactly once. Is that right ? >> >> >> No, both Flink CDC Connector and Flink Kafka Connector provide exactly >> once implementation. >> > > > > > > > On Fri, Feb 11, 2022 at 1:57 PM Martijn Visser > wrote: > >> Hi, >> >> The readme on the Flink CDC connectors [1] say that Oracle Databases >> version 11, 12, 19 are supported with Oracle Driver 19.3.0.0. >> >> Best regards, >> >> Martijn >> >> [1] >> https://github.com/ververica/flink-cdc-connectors/blob/master/README.md >> >> On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan < >> radhakrishnan.mo...@gmail.com> wrote: >> >>> Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC >>> Connector uses Debezium. I think. So ververica doesn't have a Flink CDC >>> Connector for Oracle ? >>> >>> On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu wrote: >>> Hello, mohan 1. Does flink have any support to track any missed source Jdbc CDC records ? Flink CDC Connector provides Exactly once semantics which means they won’t miss records. Tips: The Flink JDBC Connector only Scan the database once which can not continuously read CDC stream. 2. What is the equivalent of Kafka consumer groups ? Different database has different CDC mechanism, it’s serverId which used to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL. 3. Delivering to kafka from flink is not exactly once. Is that right ? No, both Flink CDC Connector and Flink Kafka Connector provide exactly once implementation. BTW, if your destination is Elasticsearch, the quick start demo[1] may help you. Best, Leonard [1] https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html Thanks On Friday, February 4, 2022, mohan radhakrishnan < radhakrishnan.mo...@gmail.com> wrote: > Hello, >So the jdbc source connector is kafka and > transformation is done by flink (flink sql) ? But that connector can miss > records. I thought. Started looking at flink for this and other use cases. > Can I see the alternative to spring cloudstreams( kafka streams )? > Since I am learning flink, kafka streams' changelog topics and > exactly-once > delivery and dlqs seemed good for our cŕitical push notifications. > > We also needed a elastic sink. > > Thanks > > On Friday, February 4, 2022, Dawid Wysakowicz > wrote: > >> Hi Mohan, >> >> I don't know much about Kafka Connect, so I will not talk about its >> features and differences to Flink. Flink on its own does not have a >> capability to read a CDC stream directly from a DB. However there is the >> flink-cdc-connectors[1] projects which embeds the standalone Debezium >> engine inside of Flink's source and can process DB changelog with all >> processing guarantees that Flink provides. >> >> As for the idea of processing further with Kafka Streams. Why not >> process data with Flink? What do you miss in Flink? >> >> Best, >> >> Dawid >> >> [1] https://github.com/ververica/flink-cdc-connectors >> >> On 04/02/2022 13:55, mohan radhakrishnan wrote: >> >>> Hi, >>> When I was looking for CDC I realized Flink uses Kafka >>> Connector to stream to Flink. The idea is to send it forward to Kafka >>> and >>> consume it using Kafka Streams. >>> >>> Are there source DLQs or additional mechanisms to detect failures to >>> read from the DB ? >>> >>> We don't want to use Debezium and our CDC is based on queries. >>> >>> What mechanisms does Flink have that a Kafka Connect worker does not >>> ? Kafka Connect workers can go down and source data can be lost. >>> >>> Does th
Re: CDC using Query
Hello, Ok. I may not have understood the answer to my previous question. When I listen to https://www.youtube.com/watch?v=IOZ2Um6e430 at 20:14 he starts to talk about this. Is he talking about a single Kafka Connect worker or a cluster ? He mentions that it is 'atleast-once'. So Flink's version is an improvement ? So Flink's Kafka Connector in a Connect cluster guarantees 'Exactly-once' ? Please bear with me. This will have other consequences too as our MQ may need a MQ connector.( Probably from Flink or Confluent ) Different connectors may have different guarantees. Thanks. > 3. Delivering to kafka from flink is not exactly once. Is that right ? > > > No, both Flink CDC Connector and Flink Kafka Connector provide exactly > once implementation. > On Fri, Feb 11, 2022 at 1:57 PM Martijn Visser wrote: > Hi, > > The readme on the Flink CDC connectors [1] say that Oracle Databases > version 11, 12, 19 are supported with Oracle Driver 19.3.0.0. > > Best regards, > > Martijn > > [1] > https://github.com/ververica/flink-cdc-connectors/blob/master/README.md > > On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan < > radhakrishnan.mo...@gmail.com> wrote: > >> Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC >> Connector uses Debezium. I think. So ververica doesn't have a Flink CDC >> Connector for Oracle ? >> >> On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu wrote: >> >>> Hello, mohan >>> >>> 1. Does flink have any support to track any missed source Jdbc CDC >>> records ? >>> >>> >>> Flink CDC Connector provides Exactly once semantics which means they >>> won’t miss records. Tips: The Flink JDBC Connector only >>> Scan the database once which can not continuously read CDC stream. >>> >>> 2. What is the equivalent of Kafka consumer groups ? >>> >>> >>> Different database has different CDC mechanism, it’s serverId which used >>> to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL. >>> >>> >>> 3. Delivering to kafka from flink is not exactly once. Is that right ? >>> >>> >>> No, both Flink CDC Connector and Flink Kafka Connector provide exactly >>> once implementation. >>> >>> BTW, if your destination is Elasticsearch, the quick start demo[1] may >>> help you. >>> >>> Best, >>> Leonard >>> >>> [1] >>> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html >>> >>> >>> >>> Thanks >>> >>> On Friday, February 4, 2022, mohan radhakrishnan < >>> radhakrishnan.mo...@gmail.com> wrote: >>> Hello, So the jdbc source connector is kafka and transformation is done by flink (flink sql) ? But that connector can miss records. I thought. Started looking at flink for this and other use cases. Can I see the alternative to spring cloudstreams( kafka streams )? Since I am learning flink, kafka streams' changelog topics and exactly-once delivery and dlqs seemed good for our cŕitical push notifications. We also needed a elastic sink. Thanks On Friday, February 4, 2022, Dawid Wysakowicz wrote: > Hi Mohan, > > I don't know much about Kafka Connect, so I will not talk about its > features and differences to Flink. Flink on its own does not have a > capability to read a CDC stream directly from a DB. However there is the > flink-cdc-connectors[1] projects which embeds the standalone Debezium > engine inside of Flink's source and can process DB changelog with all > processing guarantees that Flink provides. > > As for the idea of processing further with Kafka Streams. Why not > process data with Flink? What do you miss in Flink? > > Best, > > Dawid > > [1] https://github.com/ververica/flink-cdc-connectors > > On 04/02/2022 13:55, mohan radhakrishnan wrote: > >> Hi, >> When I was looking for CDC I realized Flink uses Kafka Connector >> to stream to Flink. The idea is to send it forward to Kafka and consume >> it >> using Kafka Streams. >> >> Are there source DLQs or additional mechanisms to detect failures to >> read from the DB ? >> >> We don't want to use Debezium and our CDC is based on queries. >> >> What mechanisms does Flink have that a Kafka Connect worker does not >> ? Kafka Connect workers can go down and source data can be lost. >> >> Does the idea to send it forward to Kafka and consume it using Kafka >> Streams make sense ? The checkpointing feature of Flink can help ? I plan >> to use Kafka Streams for 'Exactly-once Delivery' and changelog topics. >> >> Could you point out relevant material to read ? >> >> Thanks, >> Mohan >> > >>>
Removing unused flink-avro causes savepoint to fail loading
Hi, While developing a job we mistakenly imported flink-avro as a dependency and then we did some cleanups. Sadly it seems that flink-avro has registered some kryo serializers that are now required to load the savepoints even though we do not use the functionalities offered by this module. The error is (this is using flink 1.12.1): java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_b1a2a2523a4642215643a6a4e58f0d05_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:607) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.avro.generic.GenericData$Array' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:186) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:90) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:221) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:80) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:219) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:186) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:161) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:112) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:93) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOp
Re: CDC using Query
Hi, The readme on the Flink CDC connectors [1] say that Oracle Databases version 11, 12, 19 are supported with Oracle Driver 19.3.0.0. Best regards, Martijn [1] https://github.com/ververica/flink-cdc-connectors/blob/master/README.md On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan < radhakrishnan.mo...@gmail.com> wrote: > Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC > Connector uses Debezium. I think. So ververica doesn't have a Flink CDC > Connector for Oracle ? > > On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu wrote: > >> Hello, mohan >> >> 1. Does flink have any support to track any missed source Jdbc CDC >> records ? >> >> >> Flink CDC Connector provides Exactly once semantics which means they >> won’t miss records. Tips: The Flink JDBC Connector only >> Scan the database once which can not continuously read CDC stream. >> >> 2. What is the equivalent of Kafka consumer groups ? >> >> >> Different database has different CDC mechanism, it’s serverId which used >> to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL. >> >> >> 3. Delivering to kafka from flink is not exactly once. Is that right ? >> >> >> No, both Flink CDC Connector and Flink Kafka Connector provide exactly >> once implementation. >> >> BTW, if your destination is Elasticsearch, the quick start demo[1] may >> help you. >> >> Best, >> Leonard >> >> [1] >> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html >> >> >> >> Thanks >> >> On Friday, February 4, 2022, mohan radhakrishnan < >> radhakrishnan.mo...@gmail.com> wrote: >> >>> Hello, >>>So the jdbc source connector is kafka and transformation >>> is done by flink (flink sql) ? But that connector can miss records. I >>> thought. Started looking at flink for this and other use cases. >>> Can I see the alternative to spring cloudstreams( kafka streams )? Since >>> I am learning flink, kafka streams' changelog topics and exactly-once >>> delivery and dlqs seemed good for our cŕitical push notifications. >>> >>> We also needed a elastic sink. >>> >>> Thanks >>> >>> On Friday, February 4, 2022, Dawid Wysakowicz >>> wrote: >>> Hi Mohan, I don't know much about Kafka Connect, so I will not talk about its features and differences to Flink. Flink on its own does not have a capability to read a CDC stream directly from a DB. However there is the flink-cdc-connectors[1] projects which embeds the standalone Debezium engine inside of Flink's source and can process DB changelog with all processing guarantees that Flink provides. As for the idea of processing further with Kafka Streams. Why not process data with Flink? What do you miss in Flink? Best, Dawid [1] https://github.com/ververica/flink-cdc-connectors On 04/02/2022 13:55, mohan radhakrishnan wrote: > Hi, > When I was looking for CDC I realized Flink uses Kafka Connector > to stream to Flink. The idea is to send it forward to Kafka and consume it > using Kafka Streams. > > Are there source DLQs or additional mechanisms to detect failures to > read from the DB ? > > We don't want to use Debezium and our CDC is based on queries. > > What mechanisms does Flink have that a Kafka Connect worker does not ? > Kafka Connect workers can go down and source data can be lost. > > Does the idea to send it forward to Kafka and consume it using Kafka > Streams make sense ? The checkpointing feature of Flink can help ? I plan > to use Kafka Streams for 'Exactly-once Delivery' and changelog topics. > > Could you point out relevant material to read ? > > Thanks, > Mohan > >>