Sorry i think i misunderstand the issue. But it seams DataStream partition
the data by some field and when i select that  field only one taskmanager
processing the data. I can achieve same result when i use filter.Below is
the code piece:


import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala._

case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
String, curTitle: String, ttype: String)

object StreamingSelect {


  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    //val rootPath = "gs://cpcflink/wikistream/"
    val stream: DataStream[String] = env.createInput(new
TextInputFormat(new
Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))

    val wikiStream = stream.map {
      line =>
        val values = line.split("\t")
        WikiData(
          if (values(0).isEmpty) None else
Some(Integer.parseInt(values(0))),
          Integer.parseInt(values(1)),
          Integer.parseInt(values(2)),
          values(3),
          values(4),
          if (values.length < 6) null else values(5)
        )
    }

    val split = wikiStream
      .split(i => if (i.curID == 14533) List("14533") else List.empty)
    val stream14533 = split.select("14533").map(i => (i.curID, i.num))

    stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")

    env.execute()
  }
}

On 7 June 2016 at 19:26, CPC <acha...@gmail.com> wrote:

> Hello everyone,
>
> When i use DataStream split/select,it always send all selected records to
> same taskmanager. Is there any reason for this behaviour? Also is it
> possible to implement same split/select behaviour for DataSet api(without
>  using a different filter for every output )? I found this
> https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> open...
>
> Thanks...
>

Reply via email to