[ 
https://issues.apache.org/jira/browse/KAFKA-5345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5345:
----------------------------------
    Fix Version/s: 0.10.2.2

> Some socket connections not closed after restart of Kafka Streams
> -----------------------------------------------------------------
>
>                 Key: KAFKA-5345
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5345
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: MacOs 10.12.5 and Ubuntu 14.04
>            Reporter: Jeroen van Wilgenburg
>            Assignee: Rajini Sivaram
>             Fix For: 0.11.0.0, 0.10.2.2
>
>
> We ran into a problem that resulted in a "Too many open files" exception 
> because some sockets are not closed after a restart.
> This problem only occurs with version {{0.10.2.1}} and {{0.10.2.0}}. 
> {{0.10.1.1}} and {{0.10.1.0}} both work as expected.
> I used the same version for the server and client.
> I used https://github.com/kohsuke/file-leak-detector to display the open file 
> descriptors. The culprit was :
> {noformat}
> #146 socket channel by thread:pool-2-thread-1 on Mon May 29 11:20:47 CEST 2017
>       at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:108)
>       at 
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
>       at java.nio.channels.SocketChannel.open(SocketChannel.java:145)
>       at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
>       at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
>       at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsKafkaClient.ensureOneNodeIsReady(StreamsKafkaClient.java:195)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsKafkaClient.getAnyReadyBrokerId(StreamsKafkaClient.java:233)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsKafkaClient.checkBrokerCompatibility(StreamsKafkaClient.java:300)
>       at 
> org.apache.kafka.streams.KafkaStreams.checkBrokerVersionCompatibility(KafkaStreams.java:401)
>       at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:425)
> {noformat}    
>       
>       
> I could narrow the problem down to a reproducable example below (the only 
> dependency is 
> {{org.apache.kafka:kafka-streams:jar:0.10.2.1}}). 
> *IMPORTANT*: You have to run this code in the Intellij IDEA debugger with a 
> special breakpoint to see it fail. 
> See the comments on the socketChannels variable on how to add this 
> breakpoint. 
> When you run this code you will see the number of open SocketChannels 
> increase (only on version 0.10.2.x).
>       
> {code:title=App.java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import java.nio.channels.SocketChannel;
> import java.nio.channels.spi.AbstractInterruptibleChannel;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Properties;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.TimeUnit;
> import java.util.stream.Collectors;
> public class App {
>     private static KafkaStreams streams;
>     private static String brokerList;
>     // Fill socketChannels with entries on line 'Socket socket = 
> socketChannel.socket();' (line number 170  on 0.10.2.1)
>     // of org.apache.kafka.common.network.Selector: Add breakpoint, right 
> click on breakpoint.
>     // - Uncheck 'Suspend'
>     // - Check 'Evaluate and log' and fill text field with (without quotes) 
> 'App.socketChannels.add(socketChannel)'
>     private static final List<SocketChannel> socketChannels = new 
> ArrayList<>();
>     public static void main(String[] args) {
>         brokerList = args[0];
>         init();
>         ScheduledExecutorService scheduledThreadPool = 
> Executors.newScheduledThreadPool(1);
>         Runnable command = () -> {
>             streams.close();
>             System.out.println("Open socketChannels: " + 
> socketChannels.stream()
>                     .filter(AbstractInterruptibleChannel::isOpen)
>                     .collect(Collectors.toList()).size());
>             init();
>         };
>         scheduledThreadPool.scheduleWithFixedDelay(command, 10000L, 2000, 
> TimeUnit.MILLISECONDS);
>     }
>     private static void init() {
>         Properties streamsConfiguration = new Properties();
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "JeroenApp");
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> brokerList);
>         StreamsConfig config = new StreamsConfig(streamsConfiguration);
>         KStreamBuilder builder = new KStreamBuilder();
>         KStream<String, String> stream = builder.stream(Serdes.String(), 
> Serdes.String(), "HarrieTopic");
>         stream.foreach((key, value) -> System.out.println(value));
>         streams = new KafkaStreams(builder, config);
>         streams.start();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to