Actually, I take it back. It is the last union that is causing issues (of job
being un-submittable). If I don’t conbineAtEnd, I can go higher (at least
deploy the job), all the way up to 63.
After that it starts failing in too many files open in Rocks DB (which I can
understand and is at least better than silently not accepting my job).
Caused by: java.lang.RuntimeException: Error while opening RocksDB instance.
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:306)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:821)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:118)
... 4 more
Caused by: org.rocksdb.RocksDBException: IO error:
/var/folders/l1/ncffkbq11_lg6tjk_3cvc_n00000gn/T/flink-io-45a78866-a9da-40ca-be51-a894c4fac9be/3815eb68c3777ba4f504e8529db6e145/StreamSource_39_0/dummy_state/7ff48c49-b6ce-4de8-ba7e-8a240b181ae2/db/MANIFEST-000001:
Too many open files
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:239)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:304)
... 6 more
> On Jan 23, 2017, at 8:20 AM, Abhishek R. Singh
> <[email protected]> wrote:
>
> Is there a limit on how many DataStreams can be defined in a streaming
> program?
>
> Looks like flink has problems handling too many data streams? I simplified my
> topology further. For eg, this works (parallelism of 4)
>
> <PastedGraphic-2.png>
>
> However, when I try to go beyond 51 (found empirically by parametrizing
> nParts), it barfs again. Submission fails, it wants me to increase
> akka.client.timeout
>
> Here is the reduced code for repro (union at the end itself is not an issue).
> It is the parallelism of the first for loop:
> int nParts = cfg.getInt("dummyPartitions", 4);
> boolean combineAtEnd = cfg.getBoolean("dummyCombineAtEnd", true);
>
> // create lots of streams
> List<SingleOutputStreamOperator<String>> streams = new ArrayList<>(nParts);
> for (int i = 0; i < nParts; i++) {
> streams.add(env
> .readFile(
> new TextInputFormat(new Path("/tmp/input")),
> "/tmp/input",
> FileProcessingMode.PROCESS_CONTINUOUSLY,
> 1000,
> FilePathFilter.createDefaultFilter())
> .setParallelism(1).name("src"));
> }
>
> if (combineAtEnd == true) {
> DataStream<String> combined = streams.get(0);
> for (int i = 1; i < nParts; i++) {
> combined = combined.union(streams.get(i));
> }
> combined.print().setParallelism(1);
> } else { // die parallel
> for (int i = 1; i < nParts; i++) {
> streams.get(i).print();
> }
> }
>
>
>> On Jan 23, 2017, at 6:14 AM, Abhishek R. Singh
>> <[email protected] <mailto:[email protected]>>
>> wrote:
>>
>> I even make it 10 minutes:
>>
>> akka.client.timeout: 600s
>>
>> But doesn’t feel like it is taking effect. It still comes out at about the
>> same time with the same error.
>>
>> -Abhishek-
>>
>>> On Jan 23, 2017, at 6:04 AM, Abhishek R. Singh
>>> <[email protected] <mailto:[email protected]>>
>>> wrote:
>>>
>>> yes, I had increased it to 5 minutes. It just sits there and bails out
>>> again.
>>>
>>>> On Jan 23, 2017, at 1:47 AM, Jonas <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> The exception says that
>>>>
>>>> Did you already try that?
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html
>>>>
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/weird-client-failure-timeout-tp11201p11204.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive
>>>> at Nabble.com <http://nabble.com/>.
>>>
>>
>