You will have to create a custom version of the redis connector that ignores such exceptions.

On 10/15/2020 1:27 PM, Manas Kale wrote:
Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:
|FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build(); DataStream<String> stream = ...; stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());| However, if the redis server is not available, my whole job crashes with this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work. I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?


asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas


Reply via email to