RE: Streaming SQL support for redis streaming connector

2021-09-16 Thread Osada Paranaliyanage
Thanks, will have a look through!

-Original Message-
From: Yangze Guo 
Sent: Wednesday, September 15, 2021 11:25 AM
To: Osada Paranaliyanage 
Cc: David Morávek ; user@flink.apache.org
Subject: Re: Streaming SQL support for redis streaming connector

[EXTERNAL EMAIL] This email has been received from an external source – please 
review before actioning, clicking on links, or opening attachments.

> What about the bahir streaming connectors? Are they considered canonical? 
> Would they be merged in to the main project at some point? Iiuc we can use 
> table API etc to write data to redis using that right?

I think it still can be used with Flink 1.12 and 1.13. However, it has not been 
updated for 5 months and is not implemented based on the new interfaces 
introduced in FLIP-95. AFAIK, the community does not plan to merge it.

> What more will be required for us to use SQL? (as in specify the
> connector in the WITH clause as redis)

To use the bahir with SQL, you need to build the project and move the uber jar 
to the lib directory of your flink dist and fill in necessary properties[1][2].

[1] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
[2] 
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java


Best,
Yangze Guo

On Wed, Sep 15, 2021 at 11:34 AM Osada Paranaliyanage 
 wrote:
>
> Hi David,
>
>
>
>   What about the bahir streaming connectors? Are they considered
> canonical? Would they be merged in to the main project at some point?
> Iiuc we can use table API etc to write data to redis using that right?
> What more will be required for us to use SQL? (as in specify the
> connector in the WITH clause as redis)
>
>
>
> Thanks,
>
> Osada.
>
>
>
> From: David Morávek 
> Sent: Tuesday, September 14, 2021 7:53 PM
> To: Osada Paranaliyanage 
> Cc: user@flink.apache.org
> Subject: Re: Streaming SQL support for redis streaming connector
>
>
>
> [EXTERNAL EMAIL] This email has been received from an external source – 
> please review before actioning, clicking on links, or opening attachments.
>
> Hi Osada,
>
>
>
> in theory building a Redis table from "CDC stream" should definitely be 
> doable. Unfortunately Flink currently doesn't have any official Redis Sink 
> for the Table API and there is currently no on-going effort for adding it, so 
> it would need to be implemented first. The resulting syntax would be pretty 
> much the same to what's outlined in the mentioned example.
>
>
>
> Best,
>
> D.
>
>
>
> On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage 
>  wrote:
>
> Hi All, We are looking to use flink to build a materialized view of a 
> relation db and a document db using cdc streams. For this purpose we would 
> like to use redis for hosting the materialized view. Can we do this in 
> streaming SQL? We have worked through 
> https://github.com/ververica/flink-sql-CDC and can see how this will work 
> with ES as a sink. But can we use redis as the sink? Where do we find the 
> syntax for that?
>
>
>
> Thanks,
>
> Osada.
>
>
>
>
>
> 
>
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>
>
> 
>
>
> This e-mail is confidential. It may also be legally privileged. If you are 
> not the intended recipient or have received it in error, please delete it and 
> all copies from your system and notify the sender immediately by return 
> e-mail. Any unauthorized reading, reproducing, printing or further 
> dissemination of this e-mail or its contents is strictly prohibited and may 
> be unlawful. Internet communications cannot be guaranteed to be timely, 
> secure, error or virus-free. The sender does not accept liability for any 
> errors or omissions.
>




This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.



RE: Streaming SQL support for redis streaming connector

2021-09-16 Thread Osada Paranaliyanage
Hi Leonard,

  That’s awesome news. We are actually using documentdb. Any idea how much work 
it will be to make it work with documentdb instead?

Thanks,
Osada.

From: Leonard Xu 
Sent: Wednesday, September 15, 2021 1:08 PM
To: Osada Paranaliyanage 
Cc: user@flink.apache.org
Subject: Re: Streaming SQL support for redis streaming connector


[EXTERNAL EMAIL] This email has been received from an external source - please 
review before actioning, clicking on links, or opening attachments.
Hi, Osada

Just want to offer some material here.The flink-cdc-connectors project [1] 
maybe also help you, we supports the document db MongoDB[2] recently.

Best,
Leonard

[1] https://github.com/ververica/flink-cdc-connectors
[2] 
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html


在 2021年9月14日,20:56,Osada Paranaliyanage 
mailto:osada.paranaliyan...@dialog.lk>> 写道:

Hi All, We are looking to use flink to build a materialized view of a relation 
db and a document db using cdc streams. For this purpose we would like to use 
redis for hosting the materialized view. Can we do this in streaming SQL? We 
have worked through https://github.com/ververica/flink-sql-CDC and can see how 
this will work with ES as a sink. But can we use redis as the sink? Where do we 
find the syntax for that?

Thanks,
Osada.





This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.





This e-mail is confidential. It may also be legally privileged. If you are not 
the intended recipient or have received it in error, please delete it and all 
copies from your system and notify the sender immediately by return e-mail. Any 
unauthorized reading, reproducing, printing or further dissemination of this 
e-mail or its contents is strictly prohibited and may be unlawful. Internet 
communications cannot be guaranteed to be timely, secure, error or virus-free. 
The sender does not accept liability for any errors or omissions.



Re: RocksDB state not cleaned up

2021-09-16 Thread tao xiao
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that
compaction doesn't clean up the high level sst files fast enough.  Do you
think manual clean up by registering a timer is the way to go or any
RocksDB parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao  wrote:

> Hi David,
>
> If I read Stephan's comment correctly TTL doesn't work well for cases
> where we have too many levels, like fast growing state,  as compaction
> doesn't clean up high level SST files in time, Is this correct? If yes
> should we register a timer with TTL time and manual clean up the state
> (state.clear() ) when the timer fires?
>
> I will turn on RocksDB logging as well as compaction logging [1] to verify
> this
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>
>
> On Tue, Sep 14, 2021 at 5:38 PM David Morávek  wrote:
>
>> Hi Tao,
>>
>> my intuition is that the compaction of SST files is not triggering. By
>> default, it's only triggered by the size ratios of different levels [1] and
>> the TTL mechanism has no effect on it.
>>
>> Some reasoning from Stephan:
>>
>> It's very likely to have large files in higher levels that haven't been
>>> compacted in a long time and thus just stay around.
>>>
>>> This might be especially possible if you insert a lot in the beginning
>>> (build up many levels) and then have a moderate rate of modifications, so
>>> the changes and expiration keep happening purely in the merges /
>>> compactions of the first levels. Then the later levels may stay unchanged
>>> for quite some time.
>>>
>>
>> You should be able to see compaction details by setting RocksDB logging
>> to INFO [2]. Can you please check these and validate whether this really is
>> the case?
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>> [2]
>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>
>> Best,
>> D.
>>
>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao  wrote:
>>
>>> Hi team
>>>
>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>> value state doesn't exist and we never update it again after the state is
>>> not empty. The key of the value state is timestamp. My understanding of
>>> such TTL settings is that the size of all SST files remains flat (let's
>>> disregard the impact space amplification brings) after 1 day as the daily
>>> data volume is more or less the same. However the RocksDB native metrics
>>> show that the SST files continue to grow since I started the job. I check
>>> the SST files in local storage and I can see SST files with age 1 months
>>> ago (when I started the job). What is the possible reason for the SST files
>>> not cleaned up?.
>>>
>>> The Flink version is 1.12.1
>>> State backend is RocksDB with incremental checkpoint
>>> All default configuration for RocksDB
>>> Per job mode in Yarn and checkpoint to S3
>>>
>>>
>>> Here is the code to set value state
>>>
>>> public void open(Configuration parameters) {
>>> StateTtlConfig ttlConfigClick = StateTtlConfig
>>> .newBuilder(Time.days(1))
>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>> 
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>> .cleanupInRocksdbCompactFilter(300_000)
>>> .build();
>>> ValueStateDescriptor clickStateDescriptor = new 
>>> ValueStateDescriptor<>("click", Click.class);
>>> clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>> clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>
>>> StateTtlConfig ttlConfigAds = StateTtlConfig
>>> .newBuilder(Time.days(1))
>>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>> 
>>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>> .cleanupInRocksdbCompactFilter(30_000_000)
>>> .build();
>>> ValueStateDescriptor adsStateDescriptor = new 
>>> ValueStateDescriptor<>("ads", slimAdsClass);
>>> adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>> adsState = getRuntimeContext().getState(adsStateDescriptor);
>>> }
>>>
>>> @Override
>>> public void processElement(Tuple3 tuple, Context ctx, 
>>> Collector collector) throws Exception {
>>> if (tuple.f1 != null) {
>>> Click click = tuple.f1;
>>>
>>> if (clickState.value() != null) {
>>> return;
>>> }
>>>
>>> clickState.update(click);
>>>
>>> A adsFromState = adsState.value();
>>> if (adsFromState != null) {
>>> collector.collect(adsFromState);
>>> }
>>> } else {
>>> A ads = tuple.f2;
>>>
>>> if (adsState.value() != null) {
>>> return;
>>> }
>>

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

2021-09-16 Thread Lars Skjærven
Thanks for the feedback.

> May I ask why you have less partitions than the parallelism? I would be
happy to learn more about your use-case to better understand the
> motivation.

The use case is that topic A, contains just a few messages with product
metadata that rarely gets updated, while topic B contains user interactions
with the products (and many more messages). For topic A we thought that one
partition will be sufficient to keep the metadata, while we have 32
partitions for topic B. Due to the load on topic B, we're use a parallelism
of 2-8.

Thanks,
Lars



On Thu, Sep 16, 2021 at 9:09 AM Fabian Paul 
wrote:

> Hi all,
>
> The problem you are seeing Lars is somewhat intended behaviour,
> unfortunately. With the batch/stream unification every Kafka partition is
> treated
> as kind of workload assignment. If one subtask receives a signal that
> there is no workload anymore it goes into the FINISHED state.
> As already pointed this restriction will lift in the near future.
>
> I went through the code and I think in your case you can configure the
> following configuration [1] which should show an equal behaviour than the
> old source. This will prevent the enumerator from sending a final signal
> to the subtasks and they will not go into finished state.
>
> May I ask why you have less partitions than the parallelism? I would be
> happy to learn more about your use-case to better understand the
> motivation.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery


Re: Fast serialization for Kotlin data classes

2021-09-16 Thread Matthias Pohl
True, that's a valid concern you raised here, Alexis. Thanks for pointing
that out.

On Thu, Sep 16, 2021 at 1:58 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Someone please correct me if I’m wrong but, until FLINK-16686 [1] is
> fixed, a class must be a POJO to be used in managed state with RocksDB,
> right? That’s not to say that the approach with TypeInfoFactory won’t work,
> just that even then it will mean none of the data classes can be used for
> managed state.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16686
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Matthias Pohl 
> *Sent:* Donnerstag, 16. September 2021 13:12
> *To:* Alex Cruise 
> *Cc:* Flink ML 
> *Subject:* Re: Fast serialization for Kotlin data classes
>
>
>
> Hi Alex,
>
> have you had a look at TypeInfoFactory? That might be the best way to come
> up with a custom serialization mechanism. See the docs [1] for further
> details.
>
>
>
> Best,
> Matthias
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
>
>
> On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise  wrote:
>
> Hi there,
>
>
>
> I appreciate the fact that Flink has built-in support for making POJO and
> Scala `case class` serialization faster, but in my project we use immutable
> Kotlin `data class`es (analogous to Scala `case class`es) extensively, and
> we'd really prefer not to make them POJOs, mostly for style/taste reasons
> (e.g. need a default constructor and setters, both are anathema!)
>
>
>
> Does anyone know of a good way for us to keep using idiomatic, immutable
> Kotlin data classes, but to get much faster serialization performance in
> Flink?
>
>
>
> Thanks!
>
>
>
> -0xe1a
>
>


RE: Fast serialization for Kotlin data classes

2021-09-16 Thread Alexis Sarda-Espinosa
Someone please correct me if I’m wrong but, until FLINK-16686 [1] is fixed, a 
class must be a POJO to be used in managed state with RocksDB, right? That’s 
not to say that the approach with TypeInfoFactory won’t work, just that even 
then it will mean none of the data classes can be used for managed state.

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

From: Matthias Pohl 
Sent: Donnerstag, 16. September 2021 13:12
To: Alex Cruise 
Cc: Flink ML 
Subject: Re: Fast serialization for Kotlin data classes

Hi Alex,
have you had a look at TypeInfoFactory? That might be the best way to come up 
with a custom serialization mechanism. See the docs [1] for further details.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise 
mailto:a...@cluonflux.com>> wrote:
Hi there,

I appreciate the fact that Flink has built-in support for making POJO and Scala 
`case class` serialization faster, but in my project we use immutable Kotlin 
`data class`es (analogous to Scala `case class`es) extensively, and we'd really 
prefer not to make them POJOs, mostly for style/taste reasons (e.g. need a 
default constructor and setters, both are anathema!)

Does anyone know of a good way for us to keep using idiomatic, immutable Kotlin 
data classes, but to get much faster serialization performance in Flink?

Thanks!

-0xe1a


Re: Fast serialization for Kotlin data classes

2021-09-16 Thread Matthias Pohl
Hi Alex,
have you had a look at TypeInfoFactory? That might be the best way to come
up with a custom serialization mechanism. See the docs [1] for further
details.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise  wrote:

> Hi there,
>
> I appreciate the fact that Flink has built-in support for making POJO and
> Scala `case class` serialization faster, but in my project we use immutable
> Kotlin `data class`es (analogous to Scala `case class`es) extensively, and
> we'd really prefer not to make them POJOs, mostly for style/taste reasons
> (e.g. need a default constructor and setters, both are anathema!)
>
> Does anyone know of a good way for us to keep using idiomatic, immutable
> Kotlin data classes, but to get much faster serialization performance in
> Flink?
>
> Thanks!
>
> -0xe1a
>


Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

2021-09-16 Thread Fabian Paul
Hi all,

The problem you are seeing Lars is somewhat intended behaviour, unfortunately. 
With the batch/stream unification every Kafka partition is treated 
as kind of workload assignment. If one subtask receives a signal that there is 
no workload anymore it goes into the FINISHED state.
As already pointed this restriction will lift in the near future.

I went through the code and I think in your case you can configure the 
following configuration [1] which should show an equal behaviour than the
old source. This will prevent the enumerator from sending a final signal to the 
subtasks and they will not go into finished state.

May I ask why you have less partitions than the parallelism? I would be happy 
to learn more about your use-case to better understand the
motivation.

Best,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery