Hi guys!

I have one input (from mongo) and I split the incoming data to multiple
datasets (each created dynamically from configuration) and before I write
back the result I want to merge it to one dataset (there is some common
transformation).
so the flow:

DataSet from Mongod =>
Create Mappers dynamically (currently 74) so I have 74 DataSet =>
Custom filter and mapping on each dataset =>
Union dynamically to one (every mapper result is same type) =>
Some another common transformation =>
Count the result

but when I want to union more than 64 dataset I got these exception:

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Cannot currently handle nodes with more than 64 outputs.
at
org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348)
at
org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82)

I try to split the incoming (74) list of dataset to split to 60 + 14
 dataset and create an id mapper and union the result datasets but no
success:

val listOfDataSet: List[DataSet[...]] = ....

listOfDataSet
.sliding(60,60)
.map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper()))
//There is an iterator of DataSet
.reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception
.map(finalDataSet => ... some transformation ...)
.count()

There is any solution to solve this?

Thanks
b0c1

Reply via email to