Sorry i think i misunderstand the issue. But it seams DataStream partition the data by some field and when i select that field only one taskmanager processing the data. I can achieve same result when i use filter.Below is the code piece:
import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.scala._ case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle: String, curTitle: String, ttype: String) object StreamingSelect { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //val rootPath = "gs://cpcflink/wikistream/" val stream: DataStream[String] = env.createInput(new TextInputFormat(new Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv"))) val wikiStream = stream.map { line => val values = line.split("\t") WikiData( if (values(0).isEmpty) None else Some(Integer.parseInt(values(0))), Integer.parseInt(values(1)), Integer.parseInt(values(2)), values(3), values(4), if (values.length < 6) null else values(5) ) } val split = wikiStream .split(i => if (i.curID == 14533) List("14533") else List.empty) val stream14533 = split.select("14533").map(i => (i.curID, i.num)) stream14533.writeAsCsv("/home/capacman/Data/wiki/14533") env.execute() } } On 7 June 2016 at 19:26, CPC <acha...@gmail.com> wrote: > Hello everyone, > > When i use DataStream split/select,it always send all selected records to > same taskmanager. Is there any reason for this behaviour? Also is it > possible to implement same split/select behaviour for DataSet api(without > using a different filter for every output )? I found this > https://issues.apache.org/jira/browse/FLINK-87 issue but it is still > open... > > Thanks... >