I changed the max number of open files and got past this error but now I'm
seeing errors that it's unable to flush the file. I am checkpointing using
hdfs, should I be using local file system?

Is there any better way to use the cep patterns with multiple patterns or
are you suggesting creating my own pattern matching?

On Fri, Apr 28, 2017, 11:44 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> The problem here is that this will try to open 300 RocksDB instances on
> each of the TMs (depending on how the parallelism is spread between the
> machines this could be more or less). As the exception says, this will open
> too many files because each RocksDB instance has a directory with several
> files in it.
>
> One possible solution would be to increase the limit on open files but I
> don’t think that opening 300 RocksDB instances on one machine is a good
> idea for any size of machine. I think with this many patterns you could
> start thinking about writing the pattern matching yourself and multiplexing
> the several patterns in one stateful function or operator.
>
> @Stefan, what do you think about having this many Rocks instances?
>
> Best,
> Aljoscha
>
> > On 28. Apr 2017, at 17:05, mclendenin <marcusc...@gmail.com> wrote:
> >
> > Starting ~300 CEP patterns with parallelism of 6 since there are 6
> partitions
> > on a kafka topic. Checkpoint using rocksDB to Hadoop on interval of 50
> > seconds. Cluster is  HA with 2 JM and 5 TM. Getting following exception :
> >
> >
> > java.io.IOException: Error creating ColumnFamilyHandle.
> >        at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:830)
> >        at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createValueState(RocksDBKeyedStateBackend.java:838)
> >        at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:251)
> >        at
> >
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
> >        at
> >
> org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
> >        at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:248)
> >        at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:557)
> >        at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:542)
> >        at
> >
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.open(AbstractKeyedCEPPatternOperator.java:102)
> >        at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
> >        at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> >        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> >        at java.lang.Thread.run(Thread.java:745)
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> /flink/tmp/flink-io-c60bed30-5ca4-4eed-b5b4-14f3c945a46a/job-a150d7d59aafadcf922f2f397c59d6d1_op-KeyedCEPPatternOperator_874_3_uuid-3f3fea55-1af6-43fe-8e20-b213a8e06d28/db/MANIFEST-000006:
> > Too many open files
> >        at org.rocksdb.RocksDB.createColumnFamily(Native Method)
> >        at org.rocksdb.RocksDB.createColumnFamily(RocksDB.java:1323)
> >        at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getColumnFamily(RocksDBKeyedStateBackend.java:823)
> >        ... 12 more
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>

Reply via email to