Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-11 Thread M Singh
 I thought a little more about your references Martijn and wanted to confirm 
one thing - the table is specifying the watermark and the downstream view needs 
to check if it wants all events or only the non-late events.  Please let my 
understanding is correct.  
Thanks again for your references.
Mans
On Friday, February 11, 2022, 05:02:49 PM EST, M Singh 
 wrote:  
 
  
Hi Martijn:
Thanks for the reference.   
My understanding was that if we use watermark then any event with event time 
(in the above example) < event_time - 30 seconds will be dropped automatically. 
 
My question [1] is will the downstream (ALL_EVENTS) view which is selecting the 
events from the table receive events which are late ?  If late events are 
dropped at the table level then do we still need the second predicate check (ts 
> CURRENT_WATERMARK(ts)) to filter out late events at the view level.  
If the table does not drop late events, then will all downstream views/etc need 
to add this check (ts > CURRENT_WATERMARK(ts)) ?
I am still not clear on this concept of whether downstream view need to check 
for late events with this predicate or will they never receive late events.
Thanks again for your time.

On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser 
 wrote:  
 
 Hi,
There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would 
cover your questions [1].
Best regards,
Martijn
[1] 
https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:

Hi:
The flink docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
 indicates that the CURRENT_WATERMARK(rowtime) can return null:

Note that this function can return NULL, and you may have to consider this 
case. For example, if you want to filter out late data you can use:
WHERE
  CURRENT_WATERMARK(ts) IS NULL
  OR ts > CURRENT_WATERMARK(ts)
I have the following questions that if the table is defined with a watermark eg:
CREATE TABLE `MYEVENTS` (`name` STRING,    `event_time` TIMESTAMP_LTZ(3), 
...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...)

1. If we define the water mark as above, will the late events still be 
propagated to a view or table which is selecting from MYEVENTS table:
CREATE TEMPORARY VIEW `ALL_EVENTS` AS    SELECT * FROM MYEVENTS; 
2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are the 
conditions for returning null ?


Thanks
  




Re: Queryable State Deprecation

2022-02-11 Thread Jatti, Karthik
Thank you Frank and Dawid for providing the context here.

From: Frank Dekervel 
Date: Friday, February 4, 2022 at 9:56 AM
To: user@flink.apache.org 
Subject: Re: Queryable State Deprecation
EXTERNAL SENDER


Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks queryable state 
was apparently  stagnant, we implemented what was needed to sink the state to 
an external data store for querying.

However, if queryable state was in good shape we would definately have used it. 
Making sure that the state is always reflected correctly in our external system 
turned out to be non-trivial for a number of reasons: our state is not 
trivially convertable to rows in a table, and sometimes we had (due to our own 
bugs, but still) inconsistencies between the internal flink state and the 
externally materialized state, especially after replaying from a 
checkpoint/savepoint after a crash (we cannot use exactly_once sinks in all 
occasions).

Also, obviously, we could not use flinks partitioning/parallellism to help 
making state querying more scalable.

Greetings,
Frank




On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who could spend 
time on getting the Queryable state to a production ready state. I might be 
speaking for myself here, but I think the main use case for the queryable state 
is to have an insight into the current state of the application for debugging 
purposes. If it is used for data serving purposes, we believe it's better to 
sink the data into an external store, which can provide better discoverability 
and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar results with 
metrics.

Best,

Dawid
On 01/02/2022 16:36, Jatti, Karthik wrote:
Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this mailing 
group’s archives to understand the background as to why it’s being deprecated 
and what would be a an alternative.  Any pointers to help me get some more 
information here would be great.

Thanks,
Karthik



The information in the email message containing a link to this page, including 
any attachments thereto (collectively, “the e-mail”), is only for use by the 
intended recipient(s). The e-mail may contain information that is confidential, 
proprietary and/or privileged. If you have reason to believe that you are not 
the intended recipient, please notify the sender that you may have received 
this e-mail in error and delete all copies of it, including attachments, from 
your computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an intended 
recipient may be governed by arrangements in place between the sender’s and 
recipient’s respective firms. Eze Software does not represent that the e-mail 
is virus-free, complete or accurate. Eze Software accepts no liability for any 
damage sustained in connection with the content or transmission of the e-mail.

Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-11 Thread M Singh
 
Hi Martijn:
Thanks for the reference.   
My understanding was that if we use watermark then any event with event time 
(in the above example) < event_time - 30 seconds will be dropped automatically. 
 
My question [1] is will the downstream (ALL_EVENTS) view which is selecting the 
events from the table receive events which are late ?  If late events are 
dropped at the table level then do we still need the second predicate check (ts 
> CURRENT_WATERMARK(ts)) to filter out late events at the view level.  
If the table does not drop late events, then will all downstream views/etc need 
to add this check (ts > CURRENT_WATERMARK(ts)) ?
I am still not clear on this concept of whether downstream view need to check 
for late events with this predicate or will they never receive late events.
Thanks again for your time.

On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser 
 wrote:  
 
 Hi,
There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would 
cover your questions [1].
Best regards,
Martijn
[1] 
https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:

Hi:
The flink docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
 indicates that the CURRENT_WATERMARK(rowtime) can return null:

Note that this function can return NULL, and you may have to consider this 
case. For example, if you want to filter out late data you can use:
WHERE
  CURRENT_WATERMARK(ts) IS NULL
  OR ts > CURRENT_WATERMARK(ts)
I have the following questions that if the table is defined with a watermark eg:
CREATE TABLE `MYEVENTS` (`name` STRING,    `event_time` TIMESTAMP_LTZ(3), 
...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...)

1. If we define the water mark as above, will the late events still be 
propagated to a view or table which is selecting from MYEVENTS table:
CREATE TEMPORARY VIEW `ALL_EVENTS` AS    SELECT * FROM MYEVENTS; 
2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are the 
conditions for returning null ?


Thanks
  


  

Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-11 Thread Martijn Visser
Hi,

There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this
would cover your questions [1].

Best regards,

Martijn

[1]
https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md

On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:

> Hi:
>
> The flink docs (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
> indicates that the CURRENT_WATERMARK(rowtime) can return null:
>
> Note that this function can return NULL, and you may have to consider
> this case. For example, if you want to filter out late data you can use:
>
> WHERE
>   CURRENT_WATERMARK(ts) IS NULL
>   OR ts > CURRENT_WATERMARK(ts)
>
>
> I have the following questions that if the table is defined with a
> watermark eg:
>
> CREATE TABLE `MYEVENTS` (
> `name` STRING,
> `event_time` TIMESTAMP_LTZ(3),
>  ...
> WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)
> WITH (...)
>
>
> 1. If we define the water mark as above, will the late events still be
> propagated to a view or table which is selecting from MYEVENTS table:
>
> CREATE TEMPORARY VIEW `ALL_EVENTS` AS
> SELECT * FROM MYEVENTS;
>
> 2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are
> the conditions for returning null ?
>
>
>
> Thanks
>
>
>
>


Re: How to proper hashCode() for keys.

2022-02-11 Thread John Smith
Ok I used the method suggested by Ali. The error is gone. But now I see
multiple counts emitted for the same key...

DataStream slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
.uid(kafkaTopic).name(kafkaTopic)
.setParallelism(kafkaParallelism)
.flatMap(new MapToMyEvent("my-event", windowSizeMins,
"message")) <-- Timestamp in GMT created here rounded to the
closest minute down.
.uid("map-json-logs").name("map-json-logs");

slStream.keyBy(new MinutesKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins)))
< Tumbling window of 1 minute.



So below you will see a new count was emitted at 16:51 and 16:55

{"countId":"2022-02-11T16:50:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":"
mydomain.com","uri":"/some-article","count":3542}
-
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":16503}
{"countId":"2022-02-11T16:51:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":"
mydomain.com","uri":"/some-article","count":70}
-

{"countId":"2022-02-11T16:52:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":"
mydomain.com","uri":"/some-article","count":16037}
{"countId":"2022-02-11T16:53:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":"
mydomain.com","uri":"/some-article","count":18679}
{"countId":"2022-02-11T16:54:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":"
mydomain.com","uri":"/some-article","count":17697}
-

{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":18066}
{"countId":"2022-02-11T16:55:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":"
mydomain.com","uri":"/some-article","count":58}
-
{"countId":"2022-02-11T16:56:00Z|mydomain.com
|/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":"
mydomain.com","uri":"/some-article","count":17489}




On Mon, Feb 7, 2022 at 12:44 PM John Smith  wrote:

> Ok I think Ali's solution makes the most sense to me. I'll try it and let
> you know.
>
> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge  wrote:
>
>> Hi John,
>>
>> your getKey() implementation shows that it is not deterministic, since
>> calling it with the same click instance multiple times will return
>> different keys. For example a call at 12:01:59.950 and a call at
>> 12:02:00.050 with the same click instance will return two different keys:
>>
>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name
>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name
>>
>> best regards
>> Jing
>>
>> On Mon, Feb 7, 2022 at 5:07 PM John Smith  wrote:
>>
>>> Maybe there's a misunderstanding. But basically I want to do clickstream
>>> count for a given "url" and for simplicity and accuracy of the count base
>>> it on processing time (event time doesn't matter as long as I get a total
>>> of clicks at that given processing time)
>>>
>>> So regardless of the event time. I want all clicks for the current
>>> processing time rounded to the minute per link.
>>>
>>> So, if now was 2022-04-07T12:01:00.000Z
>>>
>>> Then I would want the following result...
>>>
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10
>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2
>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15
>>> 
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30
>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1
>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10
>>> And so on...
>>>
>>> @Override
>>> public MyEventCountKey getKey(final MyEvent click) throws Exception
>>> {
>>> MyEventCountKey key = new MyEventCountKey(
>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")),
>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(),
>>> click.getDomain(), // cnn.com
>>> click.getPath(), // /some-article-name
>>> );
>>> return key;
>>> }
>>>
>>>
>>>
>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek  wrote:
>>>
 The key selector works.


 No it does not ;) It depends on the system time so it's not
 deterministic (you can get different keys for the very same element).

 How do you key a count based on the time. I have taken this from
> samples online.
>

 This is what the windowing is for. You basically want to group /
 combine elements per key and event time window [1].

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

 Best,
 D.

 On Mon, Feb 7, 2022 at 3:44 PM John Smith 
 wrote:

> The key selector works. It only causes an issue if there too many keys

Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-11 Thread M Singh
Hi:
The flink docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
 indicates that the CURRENT_WATERMARK(rowtime) can return null:

Note that this function can return NULL, and you may have to consider this 
case. For example, if you want to filter out late data you can use:
WHERE
  CURRENT_WATERMARK(ts) IS NULL
  OR ts > CURRENT_WATERMARK(ts)
I have the following questions that if the table is defined with a watermark eg:
CREATE TABLE `MYEVENTS` (`name` STRING,    `event_time` TIMESTAMP_LTZ(3), 
...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...)

1. If we define the water mark as above, will the late events still be 
propagated to a view or table which is selecting from MYEVENTS table:
CREATE TEMPORARY VIEW `ALL_EVENTS` AS    SELECT * FROM MYEVENTS; 
2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are the 
conditions for returning null ?


Thanks
  



table api watermarks, timestamps, outoforderness and head aches

2022-02-11 Thread HG
Hi,

I am getting a headache when thinking about watermarks and timestamps.
My application reads events from Kafka  (they are in json format) as a
Datastream
Events can be keyed by a transactionId and have a event timestamp
(handlingTime)

All events belonging to a single transactionId will arrive in a window of a
couple of minutes (say max 5 minutes).
As soon as this 5 minutes has passed it should calculate the differences in
timestamp between the ordered events, add that elapsed time to every event
and emit them to the Sink.

I basically want to use the table api to do
"SELECT transactionId, handlingTime, handlingTime - lag(handlingTime) over
(partition by transactionId order by handlingTime) as elapsedTime,
originalEvent FROM InputTable"

After the result of this query has been pushed to the Sink all data with
respect to this transactionId can be discarded.

What kind of watermark do I need to use?
- bounded out of orderness?
- with idleness?
- ...

Late events can be ignored. They will rarely happen.

Regards Hans-Peter


Re: CDC using Query

2022-02-11 Thread Martijn Visser
Hi Mohan,

I don't know the specifics about the single Kafka Connect worker.

The Flink CDC connector is NOT a Kafka Connector. As explained before,
there is no Kafka involved when using this connector. As also is mentioned
in the same readme, it indeed provides exactly once processing.

Best regards,

Martijn

Op vr 11 feb. 2022 om 13:05 schreef mohan radhakrishnan <
radhakrishnan.mo...@gmail.com>

> Hello,
>   Ok. I may not have understood the answer to my previous
> question.
> When I listen to https://www.youtube.com/watch?v=IOZ2Um6e430 at 20:14 he
> starts to talk about this.
> Is he talking about a single Kafka Connect worker or a cluster ? He
> mentions that it is 'atleast-once'.
> So Flink's version is an improvement ? So Flink's Kafka Connector in a
> Connect cluster guarantees 'Exactly-once' ?
> Please bear with me.
>
> This will have other consequences too as our MQ may need a MQ connector.(
> Probably from Flink or Confluent  )
> Different connectors may have different guarantees.
>
> Thanks.
>
>> 3. Delivering to kafka from flink is not exactly once. Is that right ?
>>
>>
>> No, both Flink CDC Connector and Flink Kafka Connector provide exactly
>> once implementation.
>>
>
>
>
>
>
>
> On Fri, Feb 11, 2022 at 1:57 PM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> The readme on the Flink CDC connectors [1] say that Oracle Databases
>> version 11, 12, 19 are supported with Oracle Driver 19.3.0.0.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>>
>> On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan <
>> radhakrishnan.mo...@gmail.com> wrote:
>>
>>> Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC
>>> Connector uses Debezium. I think. So ververica doesn't have a Flink CDC
>>> Connector for Oracle ?
>>>
>>> On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu  wrote:
>>>
 Hello, mohan

 1. Does flink have any support to track any missed source Jdbc CDC
 records ?


 Flink CDC Connector provides Exactly once semantics which means they
 won’t miss records. Tips: The Flink JDBC Connector only
 Scan the database once which can not continuously read CDC stream.

 2. What is the equivalent of Kafka consumer groups ?


 Different database has different CDC mechanism, it’s serverId which
 used to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL.


 3. Delivering to kafka from flink is not exactly once. Is that right ?


 No, both Flink CDC Connector and Flink Kafka Connector provide exactly
 once implementation.

 BTW, if your destination is Elasticsearch, the quick start demo[1] may
 help you.

 Best,
 Leonard

 [1]
 https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html



 Thanks

 On Friday, February 4, 2022, mohan radhakrishnan <
 radhakrishnan.mo...@gmail.com> wrote:

> Hello,
>So the jdbc source connector is  kafka and
> transformation is done by flink (flink sql) ? But that connector can miss
> records. I thought. Started looking at flink for this and other use cases.
> Can I see the alternative to spring cloudstreams( kafka streams )?
> Since I am learning flink, kafka streams' changelog topics and 
> exactly-once
> delivery and dlqs seemed good for our cŕitical push notifications.
>
> We also needed a  elastic  sink.
>
> Thanks
>
> On Friday, February 4, 2022, Dawid Wysakowicz 
> wrote:
>
>> Hi Mohan,
>>
>> I don't know much about Kafka Connect, so I will not talk about its
>> features and differences to Flink. Flink on its own does not have a
>> capability to read a CDC stream directly from a DB. However there is the
>> flink-cdc-connectors[1] projects which embeds the standalone Debezium
>> engine inside of Flink's source and can process DB changelog with all
>> processing guarantees that Flink provides.
>>
>> As for the idea of processing further with Kafka Streams. Why not
>> process data with Flink? What do you miss in Flink?
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> On 04/02/2022 13:55, mohan radhakrishnan wrote:
>>
>>> Hi,
>>>  When I was looking for CDC I realized Flink uses Kafka
>>> Connector to stream to Flink. The idea is to send it forward to Kafka 
>>> and
>>> consume it using Kafka Streams.
>>>
>>> Are there source DLQs or additional mechanisms to detect failures to
>>> read from the DB ?
>>>
>>> We don't want to use Debezium and our CDC is based on queries.
>>>
>>> What mechanisms does Flink have that a Kafka Connect worker does not
>>> ? Kafka Connect workers can go down and source data can be lost.
>>>
>>> Does th

Re: CDC using Query

2022-02-11 Thread mohan radhakrishnan
Hello,
  Ok. I may not have understood the answer to my previous
question.
When I listen to https://www.youtube.com/watch?v=IOZ2Um6e430 at 20:14 he
starts to talk about this.
Is he talking about a single Kafka Connect worker or a cluster ? He
mentions that it is 'atleast-once'.
So Flink's version is an improvement ? So Flink's Kafka Connector in a
Connect cluster guarantees 'Exactly-once' ?
Please bear with me.

This will have other consequences too as our MQ may need a MQ connector.(
Probably from Flink or Confluent  )
Different connectors may have different guarantees.

Thanks.

> 3. Delivering to kafka from flink is not exactly once. Is that right ?
>
>
> No, both Flink CDC Connector and Flink Kafka Connector provide exactly
> once implementation.
>






On Fri, Feb 11, 2022 at 1:57 PM Martijn Visser 
wrote:

> Hi,
>
> The readme on the Flink CDC connectors [1] say that Oracle Databases
> version 11, 12, 19 are supported with Oracle Driver 19.3.0.0.
>
> Best regards,
>
> Martijn
>
> [1]
> https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>
> On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan <
> radhakrishnan.mo...@gmail.com> wrote:
>
>> Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC
>> Connector uses Debezium. I think. So ververica doesn't have a Flink CDC
>> Connector for Oracle ?
>>
>> On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu  wrote:
>>
>>> Hello, mohan
>>>
>>> 1. Does flink have any support to track any missed source Jdbc CDC
>>> records ?
>>>
>>>
>>> Flink CDC Connector provides Exactly once semantics which means they
>>> won’t miss records. Tips: The Flink JDBC Connector only
>>> Scan the database once which can not continuously read CDC stream.
>>>
>>> 2. What is the equivalent of Kafka consumer groups ?
>>>
>>>
>>> Different database has different CDC mechanism, it’s serverId which used
>>> to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL.
>>>
>>>
>>> 3. Delivering to kafka from flink is not exactly once. Is that right ?
>>>
>>>
>>> No, both Flink CDC Connector and Flink Kafka Connector provide exactly
>>> once implementation.
>>>
>>> BTW, if your destination is Elasticsearch, the quick start demo[1] may
>>> help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1]
>>> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html
>>>
>>>
>>>
>>> Thanks
>>>
>>> On Friday, February 4, 2022, mohan radhakrishnan <
>>> radhakrishnan.mo...@gmail.com> wrote:
>>>
 Hello,
So the jdbc source connector is  kafka and
 transformation is done by flink (flink sql) ? But that connector can miss
 records. I thought. Started looking at flink for this and other use cases.
 Can I see the alternative to spring cloudstreams( kafka streams )?
 Since I am learning flink, kafka streams' changelog topics and exactly-once
 delivery and dlqs seemed good for our cŕitical push notifications.

 We also needed a  elastic  sink.

 Thanks

 On Friday, February 4, 2022, Dawid Wysakowicz 
 wrote:

> Hi Mohan,
>
> I don't know much about Kafka Connect, so I will not talk about its
> features and differences to Flink. Flink on its own does not have a
> capability to read a CDC stream directly from a DB. However there is the
> flink-cdc-connectors[1] projects which embeds the standalone Debezium
> engine inside of Flink's source and can process DB changelog with all
> processing guarantees that Flink provides.
>
> As for the idea of processing further with Kafka Streams. Why not
> process data with Flink? What do you miss in Flink?
>
> Best,
>
> Dawid
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> On 04/02/2022 13:55, mohan radhakrishnan wrote:
>
>> Hi,
>>  When I was looking for CDC I realized Flink uses Kafka Connector
>> to stream to Flink. The idea is to send it forward to Kafka and consume 
>> it
>> using Kafka Streams.
>>
>> Are there source DLQs or additional mechanisms to detect failures to
>> read from the DB ?
>>
>> We don't want to use Debezium and our CDC is based on queries.
>>
>> What mechanisms does Flink have that a Kafka Connect worker does not
>> ? Kafka Connect workers can go down and source data can be lost.
>>
>> Does the idea  to send it forward to Kafka and consume it using Kafka
>> Streams make sense ? The checkpointing feature of Flink can help ? I plan
>> to use Kafka Streams for 'Exactly-once Delivery' and changelog topics.
>>
>> Could you point out relevant material to read ?
>>
>> Thanks,
>> Mohan
>>
>
>>>


Removing unused flink-avro causes savepoint to fail loading

2022-02-11 Thread David Causse
Hi,

While developing a job we mistakenly imported flink-avro as a dependency
and then we did some cleanups. Sadly it seems that flink-avro has
registered some kryo serializers that are now required to load the
savepoints even though we do not use the functionalities offered by this
module.
The error is (this is using flink 1.12.1):

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_b1a2a2523a4642215643a6a4e58f0d05_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:607)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key
'org.apache.avro.generic.GenericData$Array'
at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:186)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:90)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
at
java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at
java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:221)
at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:80)
at
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:219)
at
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:186)
at
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:161)
at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:112)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:93)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOp

Re: CDC using Query

2022-02-11 Thread Martijn Visser
Hi,

The readme on the Flink CDC connectors [1] say that Oracle Databases
version 11, 12, 19 are supported with Oracle Driver 19.3.0.0.

Best regards,

Martijn

[1] https://github.com/ververica/flink-cdc-connectors/blob/master/README.md

On Fri, 11 Feb 2022 at 08:37, mohan radhakrishnan <
radhakrishnan.mo...@gmail.com> wrote:

> Thanks. I looked at it. Our primary DB is Oracle and MySql. Flink CDC
> Connector uses Debezium. I think. So ververica doesn't have a Flink CDC
> Connector for Oracle ?
>
> On Mon, Feb 7, 2022 at 3:03 PM Leonard Xu  wrote:
>
>> Hello, mohan
>>
>> 1. Does flink have any support to track any missed source Jdbc CDC
>> records ?
>>
>>
>> Flink CDC Connector provides Exactly once semantics which means they
>> won’t miss records. Tips: The Flink JDBC Connector only
>> Scan the database once which can not continuously read CDC stream.
>>
>> 2. What is the equivalent of Kafka consumer groups ?
>>
>>
>> Different database has different CDC mechanism, it’s serverId which used
>> to mark a slave for MySQL/MariaDB, it’s slot name for PostgresSQL.
>>
>>
>> 3. Delivering to kafka from flink is not exactly once. Is that right ?
>>
>>
>> No, both Flink CDC Connector and Flink Kafka Connector provide exactly
>> once implementation.
>>
>> BTW, if your destination is Elasticsearch, the quick start demo[1] may
>> help you.
>>
>> Best,
>> Leonard
>>
>> [1]
>> https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html
>>
>>
>>
>> Thanks
>>
>> On Friday, February 4, 2022, mohan radhakrishnan <
>> radhakrishnan.mo...@gmail.com> wrote:
>>
>>> Hello,
>>>So the jdbc source connector is  kafka and transformation
>>> is done by flink (flink sql) ? But that connector can miss records. I
>>> thought. Started looking at flink for this and other use cases.
>>> Can I see the alternative to spring cloudstreams( kafka streams )? Since
>>> I am learning flink, kafka streams' changelog topics and exactly-once
>>> delivery and dlqs seemed good for our cŕitical push notifications.
>>>
>>> We also needed a  elastic  sink.
>>>
>>> Thanks
>>>
>>> On Friday, February 4, 2022, Dawid Wysakowicz 
>>> wrote:
>>>
 Hi Mohan,

 I don't know much about Kafka Connect, so I will not talk about its
 features and differences to Flink. Flink on its own does not have a
 capability to read a CDC stream directly from a DB. However there is the
 flink-cdc-connectors[1] projects which embeds the standalone Debezium
 engine inside of Flink's source and can process DB changelog with all
 processing guarantees that Flink provides.

 As for the idea of processing further with Kafka Streams. Why not
 process data with Flink? What do you miss in Flink?

 Best,

 Dawid

 [1] https://github.com/ververica/flink-cdc-connectors

 On 04/02/2022 13:55, mohan radhakrishnan wrote:

> Hi,
>  When I was looking for CDC I realized Flink uses Kafka Connector
> to stream to Flink. The idea is to send it forward to Kafka and consume it
> using Kafka Streams.
>
> Are there source DLQs or additional mechanisms to detect failures to
> read from the DB ?
>
> We don't want to use Debezium and our CDC is based on queries.
>
> What mechanisms does Flink have that a Kafka Connect worker does not ?
> Kafka Connect workers can go down and source data can be lost.
>
> Does the idea  to send it forward to Kafka and consume it using Kafka
> Streams make sense ? The checkpointing feature of Flink can help ? I plan
> to use Kafka Streams for 'Exactly-once Delivery' and changelog topics.
>
> Could you point out relevant material to read ?
>
> Thanks,
> Mohan
>

>>