[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397313 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws Exception { subscribedPartitionsToStartOffsets = new HashMap<>(); - List allPartitions = partitionDiscoverer.discoverPartitions(); + List allPartitions; + try { + allPartitions = partitionDiscoverer.discoverPartitions(); + } finally { + if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { + // when partition discovery is disabled, + // we should close partitionDiscoverer after the initial discovery. + // otherwise we may have connection leak, + // if open method throws an exception after partitionDiscoverer is constructed. + // In this case, run method won't be executed + // and partitionDiscoverer.close() won't be called. + partitionDiscoverer.close(); Review comment: thank you for the suggestion. updated. please ignore white space changes caused by the indention change of try-finally block. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250398973 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -732,9 +745,7 @@ public void run() { throw new RuntimeException(discoveryLoopError); } } else { - // won't be using the discoverer - partitionDiscoverer.close(); - + // partitionDiscoverer is already closed in open method Review comment: I am not sure if it is necessary to wrap `runFetchLoop` with try-finally. 1) when discovery disabled, we already take care of it in open method 2) when discovery enabled, cancel() takes care of it. what if some other place in run method throws an exception? we always need cancel() to clean up resource if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397468 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ## @@ -464,6 +466,34 @@ public void go() throws Exception { runThread.sync(); } + @Test + public void testClosePartitionDiscovererWhenOpenThrowException() throws Exception { + final AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class); + final AtomicBoolean partitionDiscovererClosed = new AtomicBoolean(false); + doAnswer((i) -> { + partitionDiscovererClosed.set(true); + return null; + }).when(mockPartitionDiscoverer).close(); + + final DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumerThrowExceptionInOpen<>(mockPartitionDiscoverer); + consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything + + try { + setupConsumer( + consumer, + false, + null, + false, // enable checkpointing; auto commit should be ignored + 0, + 1); Review comment: good batch. fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397435 ## File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ## @@ -464,6 +466,34 @@ public void go() throws Exception { runThread.sync(); } + @Test + public void testClosePartitionDiscovererWhenOpenThrowException() throws Exception { + final AbstractPartitionDiscoverer mockPartitionDiscoverer = mock(AbstractPartitionDiscoverer.class); Review comment: great idea. done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an… URL: https://github.com/apache/flink/pull/7020#discussion_r250397313 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws Exception { subscribedPartitionsToStartOffsets = new HashMap<>(); - List allPartitions = partitionDiscoverer.discoverPartitions(); + List allPartitions; + try { + allPartitions = partitionDiscoverer.discoverPartitions(); + } finally { + if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { + // when partition discovery is disabled, + // we should close partitionDiscoverer after the initial discovery. + // otherwise we may have connection leak, + // if open method throws an exception after partitionDiscoverer is constructed. + // In this case, run method won't be executed + // and partitionDiscoverer.close() won't be called. + partitionDiscoverer.close(); Review comment: thank you for the suggestion. updated. please ignore white space changes caused by the new try-finally block. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services