Sorry i think i misunderstand the issue. But it seams DataStream partition
the data by some field and when i select that field only one taskmanager
processing the data. I can achieve same result when i use filter.Below is
the code piece:
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala._
case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
String, curTitle: String, ttype: String)
object StreamingSelect {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
//val rootPath = "gs://cpcflink/wikistream/"
val stream: DataStream[String] = env.createInput(new
TextInputFormat(new
Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))
val wikiStream = stream.map {
line =>
val values = line.split("\t")
WikiData(
if (values(0).isEmpty) None else
Some(Integer.parseInt(values(0))),
Integer.parseInt(values(1)),
Integer.parseInt(values(2)),
values(3),
values(4),
if (values.length < 6) null else values(5)
)
}
val split = wikiStream
.split(i => if (i.curID == 14533) List("14533") else List.empty)
val stream14533 = split.select("14533").map(i => (i.curID, i.num))
stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")
env.execute()
}
}
On 7 June 2016 at 19:26, CPC <[email protected]> wrote:
> Hello everyone,
>
> When i use DataStream split/select,it always send all selected records to
> same taskmanager. Is there any reason for this behaviour? Also is it
> possible to implement same split/select behaviour for DataSet api(without
> using a different filter for every output )? I found this
> https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> open...
>
> Thanks...
>