Do you need to read the state you maintained from Redis? The flink-connector-redis only contains sink operator.
Best, Yangze Guo On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy <hair...@gmail.com> wrote: > > Hi, > > Thanks for your response. > > I am trying to maintain some state in redis, and for each incoming packet, > I try to map the information on redis, and then finally use ES as a sink to > push the data. > But with this flink-connector-redis, I am not sure if the same can be > achieved. Can you please elaborate on this , so it would be very helpful. > > Thank you. > > > On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <myas...@live.com> wrote: > > > Hi Ramya > > > > Have you ever tried flink-connector-redis< > > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> > > in bahir [1][2]? I think you could use it or obtain some insights. > > > > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/ > > [2] https://github.com/apache/bahir-flink > > > > Best > > Yun Tang > > > > ________________________________ > > From: Yangze Guo <karma...@gmail.com> > > Sent: Tuesday, July 21, 2020 18:50 > > To: dev <dev@flink.apache.org> > > Subject: Re: Flink Redis connectivity > > > > Hi, > > > > I think you could implement `RichMapFunction` and create `redisClient` > > in the `open` method. > > > > Best, > > Yangze Guo > > > > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <hair...@gmail.com> > > wrote: > > > > > > Hi, > > > > > > As per the understanding we have from the documentation, I guess its not > > > possible to take the redis connection within the Data Stream. In that > > case, > > > how should i proceed ? How can i access a DB client object within the > > > stream ?? > > > > > > I am using Flink 1.7. any help here would be appreciated. Thanks. > > > > > > RedisClient redisClient = new > > > RedisClient(RedisURI.create("redis://localhost:6379")); > > > RedisConnection<String, String> client = > > > redisClient.connect(); > > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, > > > String>) value -> { > > > > > > String ct = value.getField(5).toString(); > > > > > > String res = ""; > > > if (ct.equals("14") || ct.equals("4")) { > > > > > > res = client.set("key", "val"); > > > } > > > return res; > > > }); > > > > > > Thanks, > >