Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package 
to create TypeInformation object for Scala-specific objects such as case 
classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key 
by KeySelector, original value). So there is some performance decrease when you 
are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use 
> function argument (and thus do not require implicit arguments). I try to 
> avoid positional and string-based keys because there is no compiler 
> guarantees when you refactor or accidentally change the underlying case 
> classes. Providing a function is the cleanest solution (and arguably is the 
> most readable) so it'd be great to make it work.
> 
> BTW, TypeInformation.of has an implementation that takes TypeHint 
> (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
>  which, according to documentation, is supposed to be used for generic 
> classes, but using it still leads to the same exception.
> 
> Thanks,
> Timur
> 
> 
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> Hi Timur,
> 
> You can use a composite key [1] to compare keys consisting of multiple 
> fields. For example:
> 
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares 
> the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
> 
> Composite key can be applied to Scala tuple also:
> 
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
> 
> I hope this helps.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
> 
> Regards,
> Chiwan Park
> 
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> 
> > wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm 
> > using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an 
> > implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect 
> > this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, 
> > String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but 
> > apparently it's not compatible with the way implicit inference is done. 
> > (TypeInformation I generate is GenericType<scala.Tuple2>, while 
> > scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's 
> > not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ 
> > (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the 
> > hassle of dealing with collisions (which can be alleviated by using fancy 
> > hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
> 
> 

Reply via email to