Hi Ramya

I just tried to code the example which is worked in 1.10 which I using a custom 
RichFlatMapFunction to connect ,transform data  and release the conn in its 
override method.

// app.java
public class RedisMapDemo {
    public static void main(String[] args) throws Exception {
        // 1. source
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        DataStream<String> sourceStream = env.fromElements("test_value");

        // 2. custom map function
        DataStream<String> redisUpdatedStream = sourceStream.flatMap(new 
RedisFlatMap());

        redisUpdatedStream.print();
        env.execute("testing redis flatmap");
    }
}

// this should be saved as another java file  (RedisFlatMap.java)
public class RedisFlatMap extends RichFlatMapFunction<String, String> {
    String TEST_REDIS_KEY = "my_first_lettuce_key";
    RedisClient redisClient;
    StatefulRedisConnection<String, String> connection;
    RedisCommands<String, String> syncCommands;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        redisClient = RedisClient.create("redis://localhost:6379/0");
        connection = redisClient.connect();
        syncCommands = connection.sync();
    }

    @Override
    public void close() throws Exception {
        super.close();
        // maybe release conn here ?
        connection.close();
        redisClient.shutdown();
    }

    @Override
    public void flatMap(String inputString, Collector<String> out)
        throws Exception {
        // 1. write to redis
        // syncCommands.set(TEST_REDIS_KEY,  " Hello, Redis!");

        // 2. read from redis
        String tmpValue = syncCommands.get(TEST_REDIS_KEY);

        // 3. transform
        out.collect(inputString + " - " + tmpValue);
    }
}
-----邮件原件-----
发件人: Ramya Ramamurthy [mailto:hair...@gmail.com] 
发送时间: 2020年7月21日 星期二 18:42
收件人: dev@flink.apache.org
主题: Flink Redis connectivity

Hi,

As per the understanding we have from the documentation, I guess its not 
possible to take the redis connection within the Data Stream. In that case, how 
should i proceed ? How can i access a DB client object within the stream ??

I am using Flink 1.7. any help here would be appreciated. Thanks.

  RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
                RedisConnection<String, String> client = redisClient.connect(); 
DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {

            String ct = value.getField(5).toString();

            String res = "";
            if (ct.equals("14") || ct.equals("4")) {

                res = client.set("key", "val");
            }
            return res;
        });

Thanks,

Reply via email to