Hi Ramya I just tried to code the example which is worked in 1.10 which I using a custom RichFlatMapFunction to connect ,transform data and release the conn in its override method.
// app.java public class RedisMapDemo { public static void main(String[] args) throws Exception { // 1. source final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); DataStream<String> sourceStream = env.fromElements("test_value"); // 2. custom map function DataStream<String> redisUpdatedStream = sourceStream.flatMap(new RedisFlatMap()); redisUpdatedStream.print(); env.execute("testing redis flatmap"); } } // this should be saved as another java file (RedisFlatMap.java) public class RedisFlatMap extends RichFlatMapFunction<String, String> { String TEST_REDIS_KEY = "my_first_lettuce_key"; RedisClient redisClient; StatefulRedisConnection<String, String> connection; RedisCommands<String, String> syncCommands; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); redisClient = RedisClient.create("redis://localhost:6379/0"); connection = redisClient.connect(); syncCommands = connection.sync(); } @Override public void close() throws Exception { super.close(); // maybe release conn here ? connection.close(); redisClient.shutdown(); } @Override public void flatMap(String inputString, Collector<String> out) throws Exception { // 1. write to redis // syncCommands.set(TEST_REDIS_KEY, " Hello, Redis!"); // 2. read from redis String tmpValue = syncCommands.get(TEST_REDIS_KEY); // 3. transform out.collect(inputString + " - " + tmpValue); } } -----邮件原件----- 发件人: Ramya Ramamurthy [mailto:hair...@gmail.com] 发送时间: 2020年7月21日 星期二 18:42 收件人: dev@flink.apache.org 主题: Flink Redis connectivity Hi, As per the understanding we have from the documentation, I guess its not possible to take the redis connection within the Data Stream. In that case, how should i proceed ? How can i access a DB client object within the stream ?? I am using Flink 1.7. any help here would be appreciated. Thanks. RedisClient redisClient = new RedisClient(RedisURI.create("redis://localhost:6379")); RedisConnection<String, String> client = redisClient.connect(); DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, String>) value -> { String ct = value.getField(5).toString(); String res = ""; if (ct.equals("14") || ct.equals("4")) { res = client.set("key", "val"); } return res; }); Thanks,