I'm afraid there is no way around having that extra ".apply" because the
Scala compiler will get confused with the additional implicit parameter.
It's a bit ugly, though ...

On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Actually, there is an even easier solution (which I saw in your reply to
> my other question):
> ```
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2)).apply {
>
>     (left, right) => 1
>   }.print()
> ```
> pretty much does what I want. Explicit `apply` gives a hint that a
> compiler was missing before. Nevertheless, `createTypeInformation` works
> too, thanks for sharing!
>
> Thanks,
> Timur
>
> On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
>
>> Hi Timur,
>>
>> You have to use `createTypeInfomation` method in `org.apache.flink.api`
>> package to create TypeInformation object for Scala-specific objects such as
>> case classes, tuples, eithers, options. For example:
>>
>> ```
>> import org.apache.flink.api.scala._ // to import package object
>>
>> val a: DataSet[Thing] = …
>> val b: DataSet[Thing] = …
>>
>> a.coGroup(b)
>>   .where(e => (e.f1, e.f2))
>>   .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
>>     (left, right) => 1
>>   }.print()
>> ```
>>
>> Note that Flink creates internally copied 2-tuples consisted of
>> (extracted key by KeySelector, original value). So there is some
>> performance decrease when you are using KeySelector.
>>
>> Regards,
>> Chiwan Park
>>
>> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com>
>> wrote:
>> >
>> > Thank you Chiwan! Yes, I understand that there are workarounds that
>> don't use function argument (and thus do not require implicit arguments). I
>> try to avoid positional and string-based keys because there is no compiler
>> guarantees when you refactor or accidentally change the underlying case
>> classes. Providing a function is the cleanest solution (and arguably is the
>> most readable) so it'd be great to make it work.
>> >
>> > BTW, TypeInformation.of has an implementation that takes TypeHint (
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
>> which, according to documentation, is supposed to be used for generic
>> classes, but using it still leads to the same exception.
>> >
>> > Thanks,
>> > Timur
>> >
>> >
>> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org>
>> wrote:
>> > Hi Timur,
>> >
>> > You can use a composite key [1] to compare keys consisting of multiple
>> fields. For example:
>> >
>> > ```
>> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>> > a.coGroup(b)
>> >   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
>> compares the values of f2 if values of f1 are same.
>> >   .equalTo(“f1”, “f2”) { // Note that you must specify same number of
>> keys
>> >     (left, right) => 1
>> >   }
>> > ```
>> >
>> > Composite key can be applied to Scala tuple also:
>> >
>> > ```
>> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
>> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
>> > a.coGroup(b)
>> >   .where(0, 1) // Note that field numbers start from 0.
>> >   .equalTo(0, 1) {
>> >     (left, right) => 1
>> >   }
>> > ```
>> >
>> > I hope this helps.
>> >
>> > [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com>
>> wrote:
>> > >
>> > > Hello,
>> > >
>> > > Another issue I have encountered is incorrect implicit resolution
>> (I'm using Scala 2.11.7). Here's the example (with a workaround):
>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>> > > a.coGroup(b)
>> > >   .where(e => e.f1)
>> > >   //.equalTo(e => e) { //this fails to compile because equalTo
>> expects an implicit
>> > >   .equalTo("f1") {
>> > >     (left, right) => 1
>> > >   }
>> > > However, the workaround does not quite work when key is a tuple (I
>> suspect this applies to other generic classes as well):
>> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>> > > a.coGroup(b)
>> > >   .where(e => (e.f1, e.f2))
>> > >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
>> String)])) { (left, right) => 1} // throws InvalidProgramException
>> > > Here, I try to provide the implicit TypeInformation explicitly, but
>> apparently it's not compatible with the way implicit inference is done.
>> (TypeInformation I generate is GenericType<scala.Tuple2>, while
>> scala.Tuple2<String, String> is expected).
>> > >
>> > > Now, I can split this in 2 operations like below:
>> > > val tmp = a.coGroup(b)
>> > >   .where(e => (e.f1, e.f2))
>> > >   .equalTo(e => (e.f1, e.f2))
>> > >
>> > > tmp { (left, right) => 1}
>> > > but, I would like to avoid adding clutter to my processing logic, and
>> it's not entirely clear to me how this would be scheduled.
>> > >
>> > > As an option, I can hash the hell out of my keys like that:
>> > > a.coGroup(b)
>> > >   .where(e => (e.f1, e.f2).hashCode)
>> > >   .equalTo(e => (e.f1,
>> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
>> > > but that, again, adds some indirection and clutter, not mentioning
>> the hassle of dealing with collisions (which can be alleviated by using
>> fancy hashes, but I'd like to avoid that).
>> > >
>> > > Any insights on what is the way to go here are highly appreciated.
>> > >
>> > > Thanks,
>> > > Timur
>> >
>> >
>>
>>
>

Reply via email to