hello, Manas Kale.
From the log, it can be found that the exception was thrown on the 'open()' method of the RedisSink class. You can inherit the RedisSink class, then override the 'open()' method, and handle the exception as you wish.Or no longer use Apache Bahir[1] Flink redis connector class library, and inherit RichSinkFunction to develop a custom RedisSink class. Regards Shubin Ruan At 2020-10-15 19:27:29, "Manas Kale" <manaskal...@gmail.com> wrote: Hi, I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. I am following the same pattern as outlined by them in [1]: FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); However, if the redis server is not available, my whole job crashes with this exception: ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis has not been properly initialized: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:53) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226) ... I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work. I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct? asd [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/ Regards, Manas