[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397313
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws 
Exception {
 
subscribedPartitionsToStartOffsets = new HashMap<>();
 
-   List allPartitions = 
partitionDiscoverer.discoverPartitions();
+   List allPartitions;
+   try {
+   allPartitions = 
partitionDiscoverer.discoverPartitions();
+   } finally {
+   if (discoveryIntervalMillis == 
PARTITION_DISCOVERY_DISABLED) {
+   // when partition discovery is disabled,
+   // we should close partitionDiscoverer after 
the initial discovery.
+   // otherwise we may have connection leak,
+   // if open method throws an exception after 
partitionDiscoverer is constructed.
+   // In this case, run method won't be executed
+   // and partitionDiscoverer.close() won't be 
called.
+   partitionDiscoverer.close();
 
 Review comment:
   thank you for the suggestion. updated. please ignore white space changes 
caused by the indention change of try-finally block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250398973
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -732,9 +745,7 @@ public void run() {
throw new RuntimeException(discoveryLoopError);
}
} else {
-   // won't be using the discoverer
-   partitionDiscoverer.close();
-
+   // partitionDiscoverer is already closed in open method
 
 Review comment:
   I am not sure if it is necessary to wrap `runFetchLoop` with try-finally.
   1) when discovery disabled, we already take care of it in open method
   2) when discovery enabled, cancel() takes care of it. what if some other 
place in run method throws an exception? we always need cancel() to clean up 
resource if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397468
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ##
 @@ -464,6 +466,34 @@ public void go() throws Exception {
runThread.sync();
}
 
+   @Test
+   public void testClosePartitionDiscovererWhenOpenThrowException() throws 
Exception {
+   final AbstractPartitionDiscoverer mockPartitionDiscoverer = 
mock(AbstractPartitionDiscoverer.class);
+   final AtomicBoolean partitionDiscovererClosed = new 
AtomicBoolean(false);
+   doAnswer((i) -> {
+   partitionDiscovererClosed.set(true);
+   return null;
+   }).when(mockPartitionDiscoverer).close();
+
+   final DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumerThrowExceptionInOpen<>(mockPartitionDiscoverer);
+   consumer.setCommitOffsetsOnCheckpoints(false); // disabling 
offset committing should override everything
+
+   try {
+   setupConsumer(
+   consumer,
+   false,
+   null,
+   false, // enable checkpointing; auto commit 
should be ignored
+   0,
+   1);
 
 Review comment:
   good batch. fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397435
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ##
 @@ -464,6 +466,34 @@ public void go() throws Exception {
runThread.sync();
}
 
+   @Test
+   public void testClosePartitionDiscovererWhenOpenThrowException() throws 
Exception {
+   final AbstractPartitionDiscoverer mockPartitionDiscoverer = 
mock(AbstractPartitionDiscoverer.class);
 
 Review comment:
   great idea. done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-23 Thread GitBox
stevenzwu commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r250397313
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -470,7 +470,20 @@ public void open(Configuration configuration) throws 
Exception {
 
subscribedPartitionsToStartOffsets = new HashMap<>();
 
-   List allPartitions = 
partitionDiscoverer.discoverPartitions();
+   List allPartitions;
+   try {
+   allPartitions = 
partitionDiscoverer.discoverPartitions();
+   } finally {
+   if (discoveryIntervalMillis == 
PARTITION_DISCOVERY_DISABLED) {
+   // when partition discovery is disabled,
+   // we should close partitionDiscoverer after 
the initial discovery.
+   // otherwise we may have connection leak,
+   // if open method throws an exception after 
partitionDiscoverer is constructed.
+   // In this case, run method won't be executed
+   // and partitionDiscoverer.close() won't be 
called.
+   partitionDiscoverer.close();
 
 Review comment:
   thank you for the suggestion. updated. please ignore white space changes 
caused by the new try-finally block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services