Hi,

Is there a way to write each group to its own file using the Dataset api
(Batch)?

For example, lets use the following class:

case class Product(name: String, category: String)

And the following Dataset:

val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
"ssd"))

So in this example my output should be these 3 files:

- cpu.csv
i7, cpu
R5, cpu

- gpu.csv
gtx1080, gpu
vega64, gpu

- ssd.csv
evo250gb, ssd


I tried the following code, but got
org.apache.flink.api.common.InvalidProgramException: Task not serializable.

products.groupBy("category").reduceGroup { group: Iterator[Product] =>
  val items = group.toSeq
  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
  items
}

I welcome any of your inputs.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to