Hi Timur, You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options. For example:
``` import org.apache.flink.api.scala._ // to import package object val a: DataSet[Thing] = … val b: DataSet[Thing] = … a.coGroup(b) .where(e => (e.f1, e.f2)) .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { (left, right) => 1 }.print() ``` Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector, original value). So there is some performance decrease when you are using KeySelector. Regards, Chiwan Park > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > > Thank you Chiwan! Yes, I understand that there are workarounds that don't use > function argument (and thus do not require implicit arguments). I try to > avoid positional and string-based keys because there is no compiler > guarantees when you refactor or accidentally change the underlying case > classes. Providing a function is the cleanest solution (and arguably is the > most readable) so it'd be great to make it work. > > BTW, TypeInformation.of has an implementation that takes TypeHint > (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) > which, according to documentation, is supposed to be used for generic > classes, but using it still leads to the same exception. > > Thanks, > Timur > > > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Timur, > > You can use a composite key [1] to compare keys consisting of multiple > fields. For example: > > ``` > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > a.coGroup(b) > .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares > the values of f2 if values of f1 are same. > .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys > (left, right) => 1 > } > ``` > > Composite key can be applied to Scala tuple also: > > ``` > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) > a.coGroup(b) > .where(0, 1) // Note that field numbers start from 0. > .equalTo(0, 1) { > (left, right) => 1 > } > ``` > > I hope this helps. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples > > Regards, > Chiwan Park > > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> > > wrote: > > > > Hello, > > > > Another issue I have encountered is incorrect implicit resolution (I'm > > using Scala 2.11.7). Here's the example (with a workaround): > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e => e.f1) > > //.equalTo(e => e) { //this fails to compile because equalTo expects an > > implicit > > .equalTo("f1") { > > (left, right) => 1 > > } > > However, the workaround does not quite work when key is a tuple (I suspect > > this applies to other generic classes as well): > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e => (e.f1, e.f2)) > > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, > > String)])) { (left, right) => 1} // throws InvalidProgramException > > Here, I try to provide the implicit TypeInformation explicitly, but > > apparently it's not compatible with the way implicit inference is done. > > (TypeInformation I generate is GenericType<scala.Tuple2>, while > > scala.Tuple2<String, String> is expected). > > > > Now, I can split this in 2 operations like below: > > val tmp = a.coGroup(b) > > .where(e => (e.f1, e.f2)) > > .equalTo(e => (e.f1, e.f2)) > > > > tmp { (left, right) => 1} > > but, I would like to avoid adding clutter to my processing logic, and it's > > not entirely clear to me how this would be scheduled. > > > > As an option, I can hash the hell out of my keys like that: > > a.coGroup(b) > > .where(e => (e.f1, e.f2).hashCode) > > .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ > > (left, right) => 1} > > but that, again, adds some indirection and clutter, not mentioning the > > hassle of dealing with collisions (which can be alleviated by using fancy > > hashes, but I'd like to avoid that). > > > > Any insights on what is the way to go here are highly appreciated. > > > > Thanks, > > Timur > >