Hi,
I have a streaming application that pushes output to a redis cluster sink.
I am using the Apache Bahir[1] Flink redis connector for this. I want to
handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new
InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new
RedisExampleMapper());

However, if the redis server is not available, my whole job crashes with
this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis
has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class.
Where exactly do I put my try/catch to do this? Enclosing the last in the
code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the
RedisSink class, but that is a library class provided by Bahir. Is my
thinking correct?


asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/


Regards,
Manas

Reply via email to