[ 
https://issues.apache.org/jira/browse/FLINK-24369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433625#comment-17433625
 ] 

Yao Zhang commented on FLINK-24369:
-----------------------------------

Hi [~turtlebin],

I am not quite sure that what has been changed on Flink 1.13.0. But I suggest 
we might need a shutdown hook that ensure each operator is closed before JVM 
terminates. Does anyone else have better solutions?

> Resource Leak may happen if flink cluster runs abnormally and repeatedly retry
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-24369
>                 URL: https://issues.apache.org/jira/browse/FLINK-24369
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.13.0
>         Environment: flink version:1.13.0
> redisson version:3.16.1
>            Reporter: turtlebin
>            Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> I encountered a problem in the process of integrating Flink and Redisson. 
> When the task encounters abnormalities and keeps retries, it will cause the 
> number of Redis Clients to increase volatility (sometimes the number 
> increases, sometimes the number decreases, but the overall trend is growth). 
> Even if I shutdown the Redisson Instance by overwriting the close function , 
> the number of Redis-Clients cannot be prevented from continuing to grow, and 
> eventually the number of Clients will reach the upper limit and an error will 
> be thrown. Moreover, this situation only occurs in the Flink cluster 
> operation mode with version 1.13.0 or above, and the number of Redis-Clients 
> will remain stable in the local mode or in the cluster mode with version 
> under1.12. The test code is below. I wonder if you can provide specific 
> reasons and solutions for this situation, thank you.
> {code:java}
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.Properties;
> import java.util.Random;
> public class ExceptionTest {
>     public static void main(String[] args) throws Exception{
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
>         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>         env.enableCheckpointing(1000 * 60);
>         DataStream<String> mock = createDataStream(env);
>         mock.keyBy(x -> new Random().nextInt(20))
>                 .process(new ExceptionTestFunction())
>                 .uid("batch-query-key-process")
>                 .filter(x->x!=null)
>                 .print();
>         env.execute("Exception-Test");
>     }
>     private static DataStream<String> 
> createDataStream(StreamExecutionEnvironment env) {
>         String topic = "test_topic_xhb03";
>         Properties test = new Properties();
>         test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
>         test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
>         FlinkKafkaConsumer<String> consumer = new 
> FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), test);
>         consumer.setStartFromLatest();
>         DataStream<String> source = env.addSource(consumer);
>         return source;
>     }
> }
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction<Integer, 
> String, String> {
>     private RedissonRxClient redisson;
>     @Override
>     public void close() {
>         this.redisson.shutdown();
>         log.info(String.format("Shut down redisson instance in close method, 
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
>     }
>    
>     @Override
>     public void open(Configuration parameters) {
>         String prefix = "redis://";
>         Config config = new Config();
>                         config.useSingleServer()
>                                 .setClientName("xhb-redisson-main")
>                                 .setTimeout(5000)
>                                 .setConnectTimeout(10000)
>                                 .setConnectionPoolSize(4)
>                                 .setConnectionMinimumIdleSize(2)
>                                 .setIdleConnectionTimeout(10000)
>                                 .setAddress("127.0.0.1:6379")
>                                 .setDatabase(0)
>                                 .setPassword(null);
>         this.redisson = Redisson.create(config).rxJava();
>     }
>     @Override
>     public void processElement(String value, Context ctx, Collector<String> 
> out) throws Exception {
>         throw new NullPointerException("Null Pointer in ProcessElement");
>     }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to