Hi, Is there a way to write each group to its own file using the Dataset api (Batch)?
For example, lets use the following class: case class Product(name: String, category: String) And the following Dataset: val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"), Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb", "ssd")) So in this example my output should be these 3 files: - cpu.csv i7, cpu R5, cpu - gpu.csv gtx1080, gpu vega64, gpu - ssd.csv evo250gb, ssd I tried the following code, but got org.apache.flink.api.common.InvalidProgramException: Task not serializable. products.groupBy("category").reduceGroup { group: Iterator[Product] => val items = group.toSeq env.fromElements(items).writeAsCsv(s"${items.head.category}.csv") items } I welcome any of your inputs. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/