Hello,

Another issue I have encountered is incorrect implicit resolution (I'm
using Scala 2.11.7). Here's the example (with a workaround):

val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(e => e.f1)
  //.equalTo(e => e) { //this fails to compile because equalTo expects
an implicit
  .equalTo("f1") {
    (left, right) => 1
  }

However, the workaround does not quite work when key is a tuple (I suspect
this applies to other generic classes as well):

val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
String)])) { (left, right) => 1} // throws InvalidProgramException

Here, I try to provide the implicit TypeInformation explicitly, but
apparently it's not compatible with the way implicit inference is done.
(TypeInformation I generate is GenericType<scala.Tuple2>, while
scala.Tuple2<String, String> is expected).

Now, I can split this in 2 operations like below:

val tmp = a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))

tmp { (left, right) => 1}

but, I would like to avoid adding clutter to my processing logic, and it's
not entirely clear to me how this would be scheduled.

As an option, I can hash the hell out of my keys like that:

a.coGroup(b)
  .where(e => (e.f1, e.f2).hashCode)
  .equalTo(e => (e.f1,
e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}

but that, again, adds some indirection and clutter, not mentioning the
hassle of dealing with collisions (which can be alleviated by using fancy
hashes, but I'd like to avoid that).

Any insights on what is the way to go here are highly appreciated.

Thanks,
Timur

Reply via email to