Hi, I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper()); However, if the redis server is not available, my whole job crashes with this exception: ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis has not been properly initialized: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:53) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226) ... I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work. I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct? asd [1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/ Regards, Manas