RE: Streaming SQL support for redis streaming connector
Thanks, will have a look through! -Original Message- From: Yangze Guo Sent: Wednesday, September 15, 2021 11:25 AM To: Osada Paranaliyanage Cc: David Morávek ; user@flink.apache.org Subject: Re: Streaming SQL support for redis streaming connector [EXTERNAL EMAIL] This email has been received from an external source – please review before actioning, clicking on links, or opening attachments. > What about the bahir streaming connectors? Are they considered canonical? > Would they be merged in to the main project at some point? Iiuc we can use > table API etc to write data to redis using that right? I think it still can be used with Flink 1.12 and 1.13. However, it has not been updated for 5 months and is not implemented based on the new interfaces introduced in FLIP-95. AFAIK, the community does not plan to merge it. > What more will be required for us to use SQL? (as in specify the > connector in the WITH clause as redis) To use the bahir with SQL, you need to build the project and move the uber jar to the lib directory of your flink dist and fill in necessary properties[1][2]. [1] https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java [2] https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java Best, Yangze Guo On Wed, Sep 15, 2021 at 11:34 AM Osada Paranaliyanage wrote: > > Hi David, > > > > What about the bahir streaming connectors? Are they considered > canonical? Would they be merged in to the main project at some point? > Iiuc we can use table API etc to write data to redis using that right? > What more will be required for us to use SQL? (as in specify the > connector in the WITH clause as redis) > > > > Thanks, > > Osada. > > > > From: David Morávek > Sent: Tuesday, September 14, 2021 7:53 PM > To: Osada Paranaliyanage > Cc: user@flink.apache.org > Subject: Re: Streaming SQL support for redis streaming connector > > > > [EXTERNAL EMAIL] This email has been received from an external source – > please review before actioning, clicking on links, or opening attachments. > > Hi Osada, > > > > in theory building a Redis table from "CDC stream" should definitely be > doable. Unfortunately Flink currently doesn't have any official Redis Sink > for the Table API and there is currently no on-going effort for adding it, so > it would need to be implemented first. The resulting syntax would be pretty > much the same to what's outlined in the mentioned example. > > > > Best, > > D. > > > > On Tue, Sep 14, 2021 at 2:57 PM Osada Paranaliyanage > wrote: > > Hi All, We are looking to use flink to build a materialized view of a > relation db and a document db using cdc streams. For this purpose we would > like to use redis for hosting the materialized view. Can we do this in > streaming SQL? We have worked through > https://github.com/ververica/flink-sql-CDC and can see how this will work > with ES as a sink. But can we use redis as the sink? Where do we find the > syntax for that? > > > > Thanks, > > Osada. > > > > > > > > > > This e-mail is confidential. It may also be legally privileged. If you are > not the intended recipient or have received it in error, please delete it and > all copies from your system and notify the sender immediately by return > e-mail. Any unauthorized reading, reproducing, printing or further > dissemination of this e-mail or its contents is strictly prohibited and may > be unlawful. Internet communications cannot be guaranteed to be timely, > secure, error or virus-free. The sender does not accept liability for any > errors or omissions. > > > > > > This e-mail is confidential. It may also be legally privileged. If you are > not the intended recipient or have received it in error, please delete it and > all copies from your system and notify the sender immediately by return > e-mail. Any unauthorized reading, reproducing, printing or further > dissemination of this e-mail or its contents is strictly prohibited and may > be unlawful. Internet communications cannot be guaranteed to be timely, > secure, error or virus-free. The sender does not accept liability for any > errors or omissions. > This e-mail is confidential. It may also be legally privileged. If you are not the intended recipient or have received it in error, please delete it and all copies from your system and notify the sender immediately by return e-mail. Any unauthorized reading, reproducing, printing or further dissemination of this e-mail or its contents is strictly prohibited and may be unlawful. Internet communications cannot be guaranteed to be timely, secure, error or virus-free. The sender does not accept liability for any errors or omissions.
RE: Streaming SQL support for redis streaming connector
Hi Leonard, That’s awesome news. We are actually using documentdb. Any idea how much work it will be to make it work with documentdb instead? Thanks, Osada. From: Leonard Xu Sent: Wednesday, September 15, 2021 1:08 PM To: Osada Paranaliyanage Cc: user@flink.apache.org Subject: Re: Streaming SQL support for redis streaming connector [EXTERNAL EMAIL] This email has been received from an external source - please review before actioning, clicking on links, or opening attachments. Hi, Osada Just want to offer some material here.The flink-cdc-connectors project [1] maybe also help you, we supports the document db MongoDB[2] recently. Best, Leonard [1] https://github.com/ververica/flink-cdc-connectors [2] https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html 在 2021年9月14日,20:56,Osada Paranaliyanage mailto:osada.paranaliyan...@dialog.lk>> 写道: Hi All, We are looking to use flink to build a materialized view of a relation db and a document db using cdc streams. For this purpose we would like to use redis for hosting the materialized view. Can we do this in streaming SQL? We have worked through https://github.com/ververica/flink-sql-CDC and can see how this will work with ES as a sink. But can we use redis as the sink? Where do we find the syntax for that? Thanks, Osada. This e-mail is confidential. It may also be legally privileged. If you are not the intended recipient or have received it in error, please delete it and all copies from your system and notify the sender immediately by return e-mail. Any unauthorized reading, reproducing, printing or further dissemination of this e-mail or its contents is strictly prohibited and may be unlawful. Internet communications cannot be guaranteed to be timely, secure, error or virus-free. The sender does not accept liability for any errors or omissions. This e-mail is confidential. It may also be legally privileged. If you are not the intended recipient or have received it in error, please delete it and all copies from your system and notify the sender immediately by return e-mail. Any unauthorized reading, reproducing, printing or further dissemination of this e-mail or its contents is strictly prohibited and may be unlawful. Internet communications cannot be guaranteed to be timely, secure, error or virus-free. The sender does not accept liability for any errors or omissions.
Re: RocksDB state not cleaned up
Hi David, Confirmed with RocksDB log Stephan's observation is the root cause that compaction doesn't clean up the high level sst files fast enough. Do you think manual clean up by registering a timer is the way to go or any RocksDB parameter can be tuned to mitigate this issue? On Wed, Sep 15, 2021 at 12:10 AM tao xiao wrote: > Hi David, > > If I read Stephan's comment correctly TTL doesn't work well for cases > where we have too many levels, like fast growing state, as compaction > doesn't clean up high level SST files in time, Is this correct? If yes > should we register a timer with TTL time and manual clean up the state > (state.clear() ) when the timer fires? > > I will turn on RocksDB logging as well as compaction logging [1] to verify > this > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction > > > On Tue, Sep 14, 2021 at 5:38 PM David Morávek wrote: > >> Hi Tao, >> >> my intuition is that the compaction of SST files is not triggering. By >> default, it's only triggered by the size ratios of different levels [1] and >> the TTL mechanism has no effect on it. >> >> Some reasoning from Stephan: >> >> It's very likely to have large files in higher levels that haven't been >>> compacted in a long time and thus just stay around. >>> >>> This might be especially possible if you insert a lot in the beginning >>> (build up many levels) and then have a moderate rate of modifications, so >>> the changes and expiration keep happening purely in the merges / >>> compactions of the first levels. Then the later levels may stay unchanged >>> for quite some time. >>> >> >> You should be able to see compaction details by setting RocksDB logging >> to INFO [2]. Can you please check these and validate whether this really is >> the case? >> >> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction >> [2] >> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting >> >> Best, >> D. >> >> On Mon, Sep 13, 2021 at 3:18 PM tao xiao wrote: >> >>> Hi team >>> >>> We have a job that uses value state with RocksDB and TTL set to 1 day. >>> The TTL update type is OnCreateAndWrite. We set the value state when the >>> value state doesn't exist and we never update it again after the state is >>> not empty. The key of the value state is timestamp. My understanding of >>> such TTL settings is that the size of all SST files remains flat (let's >>> disregard the impact space amplification brings) after 1 day as the daily >>> data volume is more or less the same. However the RocksDB native metrics >>> show that the SST files continue to grow since I started the job. I check >>> the SST files in local storage and I can see SST files with age 1 months >>> ago (when I started the job). What is the possible reason for the SST files >>> not cleaned up?. >>> >>> The Flink version is 1.12.1 >>> State backend is RocksDB with incremental checkpoint >>> All default configuration for RocksDB >>> Per job mode in Yarn and checkpoint to S3 >>> >>> >>> Here is the code to set value state >>> >>> public void open(Configuration parameters) { >>> StateTtlConfig ttlConfigClick = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(300_000) >>> .build(); >>> ValueStateDescriptor clickStateDescriptor = new >>> ValueStateDescriptor<>("click", Click.class); >>> clickStateDescriptor.enableTimeToLive(ttlConfigClick); >>> clickState = getRuntimeContext().getState(clickStateDescriptor); >>> >>> StateTtlConfig ttlConfigAds = StateTtlConfig >>> .newBuilder(Time.days(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >>> .cleanupInRocksdbCompactFilter(30_000_000) >>> .build(); >>> ValueStateDescriptor adsStateDescriptor = new >>> ValueStateDescriptor<>("ads", slimAdsClass); >>> adsStateDescriptor.enableTimeToLive(ttlConfigAds); >>> adsState = getRuntimeContext().getState(adsStateDescriptor); >>> } >>> >>> @Override >>> public void processElement(Tuple3 tuple, Context ctx, >>> Collector collector) throws Exception { >>> if (tuple.f1 != null) { >>> Click click = tuple.f1; >>> >>> if (clickState.value() != null) { >>> return; >>> } >>> >>> clickState.update(click); >>> >>> A adsFromState = adsState.value(); >>> if (adsFromState != null) { >>> collector.collect(adsFromState); >>> } >>> } else { >>> A ads = tuple.f2; >>> >>> if (adsState.value() != null) { >>> return; >>> } >>
Re: KafkaSource builder and checkpointing with parallelism > kafka partitions
Thanks for the feedback. > May I ask why you have less partitions than the parallelism? I would be happy to learn more about your use-case to better understand the > motivation. The use case is that topic A, contains just a few messages with product metadata that rarely gets updated, while topic B contains user interactions with the products (and many more messages). For topic A we thought that one partition will be sufficient to keep the metadata, while we have 32 partitions for topic B. Due to the load on topic B, we're use a parallelism of 2-8. Thanks, Lars On Thu, Sep 16, 2021 at 9:09 AM Fabian Paul wrote: > Hi all, > > The problem you are seeing Lars is somewhat intended behaviour, > unfortunately. With the batch/stream unification every Kafka partition is > treated > as kind of workload assignment. If one subtask receives a signal that > there is no workload anymore it goes into the FINISHED state. > As already pointed this restriction will lift in the near future. > > I went through the code and I think in your case you can configure the > following configuration [1] which should show an equal behaviour than the > old source. This will prevent the enumerator from sending a final signal > to the subtasks and they will not go into finished state. > > May I ask why you have less partitions than the parallelism? I would be > happy to learn more about your use-case to better understand the > motivation. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery
Re: Fast serialization for Kotlin data classes
True, that's a valid concern you raised here, Alexis. Thanks for pointing that out. On Thu, Sep 16, 2021 at 1:58 PM Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> wrote: > Someone please correct me if I’m wrong but, until FLINK-16686 [1] is > fixed, a class must be a POJO to be used in managed state with RocksDB, > right? That’s not to say that the approach with TypeInfoFactory won’t work, > just that even then it will mean none of the data classes can be used for > managed state. > > > > [1] https://issues.apache.org/jira/browse/FLINK-16686 > > > > Regards, > > Alexis. > > > > *From:* Matthias Pohl > *Sent:* Donnerstag, 16. September 2021 13:12 > *To:* Alex Cruise > *Cc:* Flink ML > *Subject:* Re: Fast serialization for Kotlin data classes > > > > Hi Alex, > > have you had a look at TypeInfoFactory? That might be the best way to come > up with a custom serialization mechanism. See the docs [1] for further > details. > > > > Best, > Matthias > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory > > > > On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise wrote: > > Hi there, > > > > I appreciate the fact that Flink has built-in support for making POJO and > Scala `case class` serialization faster, but in my project we use immutable > Kotlin `data class`es (analogous to Scala `case class`es) extensively, and > we'd really prefer not to make them POJOs, mostly for style/taste reasons > (e.g. need a default constructor and setters, both are anathema!) > > > > Does anyone know of a good way for us to keep using idiomatic, immutable > Kotlin data classes, but to get much faster serialization performance in > Flink? > > > > Thanks! > > > > -0xe1a > >
RE: Fast serialization for Kotlin data classes
Someone please correct me if I’m wrong but, until FLINK-16686 [1] is fixed, a class must be a POJO to be used in managed state with RocksDB, right? That’s not to say that the approach with TypeInfoFactory won’t work, just that even then it will mean none of the data classes can be used for managed state. [1] https://issues.apache.org/jira/browse/FLINK-16686 Regards, Alexis. From: Matthias Pohl Sent: Donnerstag, 16. September 2021 13:12 To: Alex Cruise Cc: Flink ML Subject: Re: Fast serialization for Kotlin data classes Hi Alex, have you had a look at TypeInfoFactory? That might be the best way to come up with a custom serialization mechanism. See the docs [1] for further details. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise mailto:a...@cluonflux.com>> wrote: Hi there, I appreciate the fact that Flink has built-in support for making POJO and Scala `case class` serialization faster, but in my project we use immutable Kotlin `data class`es (analogous to Scala `case class`es) extensively, and we'd really prefer not to make them POJOs, mostly for style/taste reasons (e.g. need a default constructor and setters, both are anathema!) Does anyone know of a good way for us to keep using idiomatic, immutable Kotlin data classes, but to get much faster serialization performance in Flink? Thanks! -0xe1a
Re: Fast serialization for Kotlin data classes
Hi Alex, have you had a look at TypeInfoFactory? That might be the best way to come up with a custom serialization mechanism. See the docs [1] for further details. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory On Tue, Sep 14, 2021 at 8:33 PM Alex Cruise wrote: > Hi there, > > I appreciate the fact that Flink has built-in support for making POJO and > Scala `case class` serialization faster, but in my project we use immutable > Kotlin `data class`es (analogous to Scala `case class`es) extensively, and > we'd really prefer not to make them POJOs, mostly for style/taste reasons > (e.g. need a default constructor and setters, both are anathema!) > > Does anyone know of a good way for us to keep using idiomatic, immutable > Kotlin data classes, but to get much faster serialization performance in > Flink? > > Thanks! > > -0xe1a >
Re: KafkaSource builder and checkpointing with parallelism > kafka partitions
Hi all, The problem you are seeing Lars is somewhat intended behaviour, unfortunately. With the batch/stream unification every Kafka partition is treated as kind of workload assignment. If one subtask receives a signal that there is no workload anymore it goes into the FINISHED state. As already pointed this restriction will lift in the near future. I went through the code and I think in your case you can configure the following configuration [1] which should show an equal behaviour than the old source. This will prevent the enumerator from sending a final signal to the subtasks and they will not go into finished state. May I ask why you have less partitions than the parallelism? I would be happy to learn more about your use-case to better understand the motivation. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery