You will have to create a custom version of the redis connector that
ignores such exceptions.
On 10/15/2020 1:27 PM, Manas Kale wrote:
Hi,
I have a streaming application that pushes output to a redis cluster
sink. I am using the Apache Bahir[1] Flink redis connector for this. I
want to handle the case when the redis server is unavailable.
I am following the same pattern as outlined by them in [1]:
|FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new
InetSocketAddress(5601)))).build(); DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new
RedisExampleMapper());|
However, if the redis server is not available, my whole job crashes
with this exception:
ERROR org.apache.flink.streaming.connectors.redis.RedisSink -
Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get
a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...
I want to handle and ignore such exceptions thrown by the RedisSink
class. Where exactly do I put my try/catch to do this? Enclosing the
last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in
the RedisSink class, but that is a library class provided by Bahir. Is
my thinking correct?
asd
[1] https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
Regards,
Manas