Actually, I take it back. It is the last union that is causing issues (of job being un-submittable). If I don’t conbineAtEnd, I can go higher (at least deploy the job), all the way up to 63.
After that it starts failing in too many files open in Rocks DB (which I can understand and is at least better than silently not accepting my job). Caused by: java.lang.RuntimeException: Error while opening RocksDB instance. at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306) at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118) ... 4 more Caused by: org.rocksdb.RocksDBException: IO error: /var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001: Too many open files at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:239) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304) ... 6 more > On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh > <abhis...@tetrationanalytics.com> wrote: > > Is there a limit on how many DataStreams can be defined in a streaming > program? > > Looks like flink has problems handling too many data streams? I simplified my > topology further. For eg, this works (parallelism of 4) > > <PastedGraphic-2.png> > > However, when I try to go beyond 51 (found empirically by parametrizing > nParts), it barfs again. Submission fails, it wants me to increase > akka.client.timeout > > Here is the reduced code for repro (union at the end itself is not an issue). > It is the parallelism of the first for loop: > int nParts = cfg.getInt("dummyPartitions", 4); > boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true); > > // create lots of streams > List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts); > for (int i = 0; i < nParts; i++) { > streams.add(env > .readFile( > new TextInputFormat(new Path("/tmp/input")), > "/tmp/input", > FileProcessingMode.PROCESS_CONTINUOUSLY, > 1000, > FilePathFilter.createDefaultFilter()) > .setParallelism(1).name("src")); > } > > if (combineAtEnd == true) { > DataStream<String> combined = streams.get(0); > for (int i = 1; i < nParts; i++) { > combined = combined.union(streams.get(i)); > } > combined.print().setParallelism(1); > } else { // die parallel > for (int i = 1; i < nParts; i++) { > streams.get(i).print(); > } > } > > >> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh >> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> >> wrote: >> >> I even make it 10 minutes: >> >> akka.client.timeout: 600s >> >> But doesn’t feel like it is taking effect. It still comes out at about the >> same time with the same error. >> >> -Abhishek- >> >>> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh >>> <abhis...@tetrationanalytics.com <mailto:abhis...@tetrationanalytics.com>> >>> wrote: >>> >>> yes, I had increased it to 5 minutes. It just sits there and bails out >>> again. >>> >>>> On Jan 23, 2017, at 1:47 AM, Jonas <jo...@huntun.de >>>> <mailto:jo...@huntun.de>> wrote: >>>> >>>> The exception says that >>>> >>>> Did you already try that? >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html >>>> >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html> >>>> Sent from the Apache Flink User Mailing List archive. mailing list archive >>>> at Nabble.com <http://nabble.com/>. >>> >> >