Hi all, I'm reading a large number of small files from HDFS in batch mode (about 20 directories, each directory contains about 3000 files, using recursive.file.enumeration=true), and each time, at about 200 GB of received data, my job fails with the following exception:
java.io.IOException: Error opening the Input Split hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 file=/filepath/filename.csv.gz at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:693) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 file=/filepath/filename.csv.gz at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:984) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) ... 5 more I checked the file each time and it exists and is healthy. Looking at the taskmanager logs, I found the following exceptions which suggests it is running out of connections: 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.BlockReaderFactory - I/O error constructing remote block reader. java.net.SocketException: No buffer space available (maximum connections reached?): connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.DFSClient - Failed to connect to /x.x.x.x:50010 for block, add to deadNodes and continue. java.net.SocketException: No buffer space available (maximum connections reached?): connect java.net.SocketException: No buffer space available (maximum connections reached?): connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.Net.connect(Unknown Source) at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777) at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694) at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355) at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) at java.io.FilterInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59) at java.util.zip.CheckedInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.readUByte(Unknown Source) at java.util.zip.GZIPInputStream.readUShort(Unknown Source) at java.util.zip.GZIPInputStream.readHeader(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at java.util.zip.GZIPInputStream.<init>(Unknown Source) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44) at org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31) at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424) at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Unknown Source) I inspected the open connections, and found that a very large number of connections are opened by the job and stuck on the CLOSE_WAIT status, which I guess exhausted the ephemeral port space after some time. I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and using a prallelism of 8. I got the same exception even with a job paralellism set to 1. The same exception happened after upgrading to Flink 1.1.3 too. Any idea what could be the root cause of the problem and how to solve it? Thank you. Best, Yassine