Actually, I take it back. It is the last union that is causing issues (of job 
being un-submittable). If I don’t conbineAtEnd, I can go higher (at least 
deploy the job), all the way up to 63. 

After that it starts failing in too many files open in Rocks DB (which I can 
understand and is at least better than silently not accepting my job).

Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
        ... 4 more
Caused by: org.rocksdb.RocksDBException: IO error: 
/var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001:
 Too many open files
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:239)
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
        ... 6 more



> On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh 
> <abhis...@tetrationanalytics.com> wrote:
> 
> Is there a limit on how many DataStreams can be defined in a streaming 
> program?
> 
> Looks like flink has problems handling too many data streams? I simplified my 
> topology further. For eg, this works (parallelism of 4)
> 
> <PastedGraphic-2.png>
> 
> However, when I try to go beyond 51 (found empirically by parametrizing 
> nParts), it barfs again. Submission fails, it wants me to increase 
> akka.client.timeout
> 
> Here is the reduced code for repro (union at the end itself is not an issue). 
> It is the parallelism of the first for loop:
> int nParts = cfg.getInt("dummyPartitions", 4);
> boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);
> 
> // create lots of streams
> List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
>   streams.add(env
>       .readFile(
>           new TextInputFormat(new Path("/tmp/input")),
>           "/tmp/input",
>           FileProcessingMode.PROCESS_CONTINUOUSLY,
>           1000,
>           FilePathFilter.createDefaultFilter())
>       .setParallelism(1).name("src"));
> }
> 
> if (combineAtEnd == true) {
>   DataStream<String> combined = streams.get(0);
>   for (int i = 1; i < nParts; i++) {
>     combined = combined.union(streams.get(i));
>   }
>   combined.print().setParallelism(1);
> } else { // die parallel
>   for (int i = 1; i < nParts; i++) {
>     streams.get(i).print();
>   }
> }
> 
> 
>> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh 
>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>> wrote:
>> 
>> I even make it 10 minutes:
>> 
>> akka.client.timeout: 600s
>> 
>> But doesn’t feel like it is taking effect. It still comes out at about the 
>> same time with the same error.
>> 
>> -Abhishek-
>> 
>>> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh 
>>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> 
>>> wrote:
>>> 
>>> yes, I had increased it to 5 minutes. It just sits there and bails out 
>>> again.
>>> 
>>>> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de 
>>>> <mailto:jo...@huntun.de>> wrote:
>>>> 
>>>> The exception says that
>>>> 
>>>> Did you already try that?
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
>>>>  
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>>> at Nabble.com <http://nabble.com/>.
>>> 
>> 
> 

Reply via email to