Hello everybody,

as I'm getting familiar with Flink I've found a possible improvement to the
Scala APIs: in Scala it's a common pattern to perform tuple extraction
using pattern matching, making functions working on tuples more readable,
like this:

// referring to the mail count example in the training
// assuming `mails` is a DataSet[(String, String)]
// a pair of date and a string with username and email
val monthsAndEmails =
  mails.map {
    case (date, sender) =>
      (extractMonth(date), extractEmail(sender))
  }

However, this is not possible when using the Scala APIs because of the
overloading of the `map` function in the `DataSet` and `DataStream` classes
(along with other higher-order function such as `flatMap` and `filter`). My
understanding is that the main reason to have two different overloaded
functions is to provide support for `RichFunction`s.
I've found out there has been some interest around the issue in the past (
[FLINK-1159] <https://issues.apache.org/jira/browse/FLINK-1159>).
In the past couple of days me and my colleague Andrea have tried several
ways to address the problem, coming to two possible solutions:

   1. don't overload and use different names, e.g. `map` taking a Scala
   function and `mapWith` taking a Flink MapFunction
   2. keep only the method taking a Scala function (which would be more
   idiomatic from a Scala perspective, IMHO) and providing an implicit
   conversion from the Flink function to the Scala function within the
   `org.apache.flink.api.scala` package object

We've also evaluated several other approaches using union types and type
classes but we've found them to be too complex. Regarding the two
approaches I've cited, the first would imply a breaking change to the APIs,
while the second is giving me a hard time at figuring out some compilation
errors in `flink-libraries` and `flink-contrib` and as we tested it we
found out `RichMapFunction`s lose state (possibly because of the double
conversion, first to a Scala function, then to a simple `MapFunction`).

You can have a look at the code I've written so far here (last 2 commits):
https://github.com/radicalbit/flink/commits/1159

We had a little exchange of ideas and thought that the first solution would
be easier and also interesting from the standpoint of the ergonomics of the
API (e.g. `line mapWith new LineSplitter`) and would like to gather some
feedback on the feasibility of this change.

Would this still be regarded as a relevant improvement? What do you think
about it? Do you think there's time to work on them before the 1.0 release?
What do you think about introducing breaking changes to make this pattern
available to Scala users?

Thank you all in advance for your feedback.

-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply via email to