Just to clarify, this is an example of a change I had to do to get the code 
working with Scala 2.12. With Scala 2.11 this works:

val terminate = prevPaths
  .coGroup(nextPaths)
  .where(0).equalTo(0) {
    (prev, next, out: Collector[(Long, Long)]) => {
      val prevPaths = prev.toSet
      for (n <- next)
        if (!prevPaths.contains(n)) out.collect(n)
    }
}

With Scala 2.12 you have to change it to:

val terminate = prevPaths
  .coGroup(nextPaths)
  .where(0).equalTo(0) {
    (prev: Iterator[(Long, Long)], next: Iterator[(Long, Long)], out: 
Collector[(Long, Long)]) => {
        val prevPaths = prev.toSet
        for (n <- next)
          if (!prevPaths.contains(n)) out.collect(n)
      }
}

The only required changes will be explicit type annotations in lambdas. There 
is no casting or other "hacky" stuff required.

> On 30. Oct 2018, at 11:43, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> I think there was no conclusion so I didn't go for breaking the API in the 
> end. What this means for users is:
> 
> - when using Scala 2.11 nothing changes
> - you can now use Scala 2.12, but you might have to add explicit types to 
> disambiguate calls. This disambiguation is not needed when using Scala 2.11
> 
>> On 26. Oct 2018, at 11:06, Chesnay Schepler <ches...@apache.org> wrote:
>> 
>> I was wondering about the outcome of this discussion on what it means for 
>> our users.
>> 
>> In particular: Does this API break only apply to 2.12 users, or also for 
>> people using 2.11?
>> 
>> On 04.10.2018 17:10, Aljoscha Krettek wrote:
>>> Hi,
>>> 
>>> I'm currently working on https://issues.apache.org/jira/browse/FLINK-7811, 
>>> with the goal of adding support for Scala 2.12. There is a bit of a hurdle 
>>> and I have to explain some context first.
>>> 
>>> With Scala 2.12, lambdas are implemented using the lambda mechanism of Java 
>>> 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This means 
>>> that the following two method definitions can both take a lambda:
>>> 
>>> def map[R](mapper: MapFunction[T, R]): DataSet[R]
>>> def map[R](fun: T => R): DataSet[R]
>>> 
>>> The Scala compiler gives precedence to the lambda version when you call 
>>> map() with a lambda in simple cases, so it works here. You could still call 
>>> map() with a lambda if the lambda version of the method weren't here 
>>> because they are now considered the same. For Scala 2.11 we need both 
>>> signatures, though, to allow calling with a lambda and with a MapFunction.
>>> 
>>> The problem is with more complicated method signatures, like:
>>> 
>>> def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit): 
>>> DataSet[R]
>>> 
>>> def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
>>> 
>>> (for reference, GroupReduceFunction is a SAM with void 
>>> reduce(java.lang.Iterable<T> values, Collector<O> out))
>>> 
>>> These two signatures are not the same but similar enough for the Scala 2.12 
>>> compiler to "get confused". In Scala 2.11, I could call reduceGroup() with 
>>> a lambda that doesn't have parameter type definitions and things would be 
>>> fine. With Scala 2.12 I can't do that because the compiler can't figure out 
>>> which method to call and requires explicit type definitions on the lambda 
>>> parameters.
>>> 
>>> I see some solutions for this:
>>> 
>>> 1. Keep the methods as is, this would force people to always explicitly 
>>> specify parameter types on their lambdas.
>>> 
>>> 2. Rename the second method to reduceGroupJ() to signal that it takes a 
>>> user function that takes Java-style interfaces (the first parameter is 
>>> java.lang.Iterable while the Scala lambda takes a scala.Iterator). This 
>>> disambiguates the code, users can use lambdas without specifying explicit 
>>> parameter types but breaks the API.
>>> 
>>> One effect of 2. would be that we can add a reduceGroup() method that takes 
>>> a api.scala.GroupReduceFunction that takes proper Scala types, thus it 
>>> would allow people to implement user functions without having to cast the 
>>> various Iterator/Iterable parameters.
>>> 
>>> Either way, people would have to adapt their code when moving to Scala 2.12 
>>> in some way, depending on what style of methods they use.
>>> 
>>> There is also solution 2.5:
>>> 
>>> 2.5 Rename the methods only in the Scala 2.12 build of Flink and keep the 
>>> old method names for Scala 2.11. This would require some infrastructure and 
>>> I don't yet know how it can be done in a sane way.
>>> 
>>> What do you think? I personally would be in favour of 2. but it breaks the 
>>> existing API.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
>>> 
>>> 
>> 
> 

Reply via email to