hello, Manas Kale.

From the log, it can be found that the exception was thrown on the 'open()' 
method of the RedisSink class. You can inherit the RedisSink class, then 
override the 'open()' method, and handle the exception as you wish.Or no longer 
use Apache Bahir[1] Flink redis connector class library, and inherit 
RichSinkFunction to develop a custom RedisSink class.


Regards
Shubin Ruan

At 2020-10-15 19:27:29, "Manas Kale" <manaskal...@gmail.com> wrote:

Hi,
I have a streaming application that pushes output to a redis cluster sink. I am 
using the Apache Bahir[1] Flink redis connector for this. I want to handle the 
case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new 
InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this 
exception:


ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has 
not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a 
resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)

...


I want to handle and ignore such exceptions thrown by the RedisSink class. 
Where exactly do I put my try/catch to do this? Enclosing the last in the code 
snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the 
RedisSink class, but that is a library class provided by Bahir. Is my thinking 
correct?




asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/




Regards,
Manas




 

Reply via email to