Just to clarify, this is an example of a change I had to do to get the code
working with Scala 2.12. With Scala 2.11 this works:
val terminate = prevPaths
.coGroup(nextPaths)
.where(0).equalTo(0) {
(prev, next, out: Collector[(Long, Long)]) => {
val prevPaths = prev.toSet
for (n <- next)
if (!prevPaths.contains(n)) out.collect(n)
}
}
With Scala 2.12 you have to change it to:
val terminate = prevPaths
.coGroup(nextPaths)
.where(0).equalTo(0) {
(prev: Iterator[(Long, Long)], next: Iterator[(Long, Long)], out:
Collector[(Long, Long)]) => {
val prevPaths = prev.toSet
for (n <- next)
if (!prevPaths.contains(n)) out.collect(n)
}
}
The only required changes will be explicit type annotations in lambdas. There
is no casting or other "hacky" stuff required.
> On 30. Oct 2018, at 11:43, Aljoscha Krettek <[email protected]> wrote:
>
> I think there was no conclusion so I didn't go for breaking the API in the
> end. What this means for users is:
>
> - when using Scala 2.11 nothing changes
> - you can now use Scala 2.12, but you might have to add explicit types to
> disambiguate calls. This disambiguation is not needed when using Scala 2.11
>
>> On 26. Oct 2018, at 11:06, Chesnay Schepler <[email protected]> wrote:
>>
>> I was wondering about the outcome of this discussion on what it means for
>> our users.
>>
>> In particular: Does this API break only apply to 2.12 users, or also for
>> people using 2.11?
>>
>> On 04.10.2018 17:10, Aljoscha Krettek wrote:
>>> Hi,
>>>
>>> I'm currently working on https://issues.apache.org/jira/browse/FLINK-7811,
>>> with the goal of adding support for Scala 2.12. There is a bit of a hurdle
>>> and I have to explain some context first.
>>>
>>> With Scala 2.12, lambdas are implemented using the lambda mechanism of Java
>>> 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This means
>>> that the following two method definitions can both take a lambda:
>>>
>>> def map[R](mapper: MapFunction[T, R]): DataSet[R]
>>> def map[R](fun: T => R): DataSet[R]
>>>
>>> The Scala compiler gives precedence to the lambda version when you call
>>> map() with a lambda in simple cases, so it works here. You could still call
>>> map() with a lambda if the lambda version of the method weren't here
>>> because they are now considered the same. For Scala 2.11 we need both
>>> signatures, though, to allow calling with a lambda and with a MapFunction.
>>>
>>> The problem is with more complicated method signatures, like:
>>>
>>> def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
>>> DataSet[R]
>>>
>>> def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
>>>
>>> (for reference, GroupReduceFunction is a SAM with void
>>> reduce(java.lang.Iterable<T> values, Collector<O> out))
>>>
>>> These two signatures are not the same but similar enough for the Scala 2.12
>>> compiler to "get confused". In Scala 2.11, I could call reduceGroup() with
>>> a lambda that doesn't have parameter type definitions and things would be
>>> fine. With Scala 2.12 I can't do that because the compiler can't figure out
>>> which method to call and requires explicit type definitions on the lambda
>>> parameters.
>>>
>>> I see some solutions for this:
>>>
>>> 1. Keep the methods as is, this would force people to always explicitly
>>> specify parameter types on their lambdas.
>>>
>>> 2. Rename the second method to reduceGroupJ() to signal that it takes a
>>> user function that takes Java-style interfaces (the first parameter is
>>> java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
>>> disambiguates the code, users can use lambdas without specifying explicit
>>> parameter types but breaks the API.
>>>
>>> One effect of 2. would be that we can add a reduceGroup() method that takes
>>> a api.scala.GroupReduceFunction that takes proper Scala types, thus it
>>> would allow people to implement user functions without having to cast the
>>> various Iterator/Iterable parameters.
>>>
>>> Either way, people would have to adapt their code when moving to Scala 2.12
>>> in some way, depending on what style of methods they use.
>>>
>>> There is also solution 2.5:
>>>
>>> 2.5 Rename the methods only in the Scala 2.12 build of Flink and keep the
>>> old method names for Scala 2.11. This would require some infrastructure and
>>> I don't yet know how it can be done in a sane way.
>>>
>>> What do you think? I personally would be in favour of 2. but it breaks the
>>> existing API.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>>
>>>
>>
>