Hello everybody, as I'm getting familiar with Flink I've found a possible improvement to the Scala APIs: in Scala it's a common pattern to perform tuple extraction using pattern matching, making functions working on tuples more readable, like this:
// referring to the mail count example in the training // assuming `mails` is a DataSet[(String, String)] // a pair of date and a string with username and email val monthsAndEmails = mails.map { case (date, sender) => (extractMonth(date), extractEmail(sender)) } However, this is not possible when using the Scala APIs because of the overloading of the `map` function in the `DataSet` and `DataStream` classes (along with other higher-order function such as `flatMap` and `filter`). My understanding is that the main reason to have two different overloaded functions is to provide support for `RichFunction`s. I've found out there has been some interest around the issue in the past ( [FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>). In the past couple of days me and my colleague Andrea have tried several ways to address the problem, coming to two possible solutions: 1. don't overload and use different names, e.g. `map` taking a Scala function and `mapWith` taking a Flink MapFunction 2. keep only the method taking a Scala function (which would be more idiomatic from a Scala perspective, IMHO) and providing an implicit conversion from the Flink function to the Scala function within the `org.apache.flink.api.scala` package object We've also evaluated several other approaches using union types and type classes but we've found them to be too complex. Regarding the two approaches I've cited, the first would imply a breaking change to the APIs, while the second is giving me a hard time at figuring out some compilation errors in `flink-libraries` and `flink-contrib` and as we tested it we found out `RichMapFunction`s lose state (possibly because of the double conversion, first to a Scala function, then to a simple `MapFunction`). You can have a look at the code I've written so far here (last 2 commits): https://github.com/radicalbit/flink/commits/1159 We had a little exchange of ideas and thought that the first solution would be easier and also interesting from the standpoint of the ergonomics of the API (e.g. `line mapWith new LineSplitter`) and would like to gather some feedback on the feasibility of this change. Would this still be regarded as a relevant improvement? What do you think about it? Do you think there's time to work on them before the 1.0 release? What do you think about introducing breaking changes to make this pattern available to Scala users? Thank you all in advance for your feedback. -- BR, Stefano Baghino Software Engineer @ Radicalbit