Do you need to read the state you maintained from Redis? The
flink-connector-redis only contains sink operator.

Best,
Yangze Guo

On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy <hair...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your response.
>
> I am trying to maintain some state in redis, and for each incoming packet,
> I try to map the information on redis, and then finally use ES as a sink to
> push the data.
> But with this flink-connector-redis, I am not sure if the same can be
> achieved. Can you please elaborate on this , so it would be very helpful.
>
> Thank you.
>
>
> On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <myas...@live.com> wrote:
>
> > Hi Ramya
> >
> > Have you ever tried flink-connector-redis<
> > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> > in bahir [1][2]? I think you could use it or obtain some insights.
> >
> > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> > [2] https://github.com/apache/bahir-flink
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Yangze Guo <karma...@gmail.com>
> > Sent: Tuesday, July 21, 2020 18:50
> > To: dev <dev@flink.apache.org>
> > Subject: Re: Flink Redis connectivity
> >
> > Hi,
> >
> > I think you could implement `RichMapFunction` and create `redisClient`
> > in the `open` method.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <hair...@gmail.com>
> > wrote:
> > >
> > > Hi,
> > >
> > > As per the understanding we have from the documentation, I guess its not
> > > possible to take the redis connection within the Data Stream. In that
> > case,
> > > how should i proceed ? How can i access a DB client object within the
> > > stream ??
> > >
> > > I am using Flink 1.7. any help here would be appreciated. Thanks.
> > >
> > >   RedisClient redisClient = new
> > > RedisClient(RedisURI.create("redis://localhost:6379"));
> > >                 RedisConnection<String, String> client =
> > > redisClient.connect();
> > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> > > String>) value -> {
> > >
> > >             String ct = value.getField(5).toString();
> > >
> > >             String res = "";
> > >             if (ct.equals("14") || ct.equals("4")) {
> > >
> > >                 res = client.set("key", "val");
> > >             }
> > >             return res;
> > >         });
> > >
> > > Thanks,
> >

Reply via email to