Hi, the error means that you are grouping on a field which contains null values. We can not compare elements against null, that's why we throw the exception. Are you sure that you're not having any null elements inside the DataSet you're comparing against?
I'm not 100% sure that my fix is correct .. maybe the tests will uncover that I've overseen something (they are still running). On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier <[email protected]> wrote: > Hi Robert, > I applied your fix but still I get one error (not in the same point at > least..) > Basically what I do is: > > DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my test > DataSet<Tuple3<String, String, DateTime>> someEvents; > DataSet<Tuple4<String, String, DateTime, DateTime>> res = > someEvents.coGroup(someDates).where(0).equalTo(0).with( > new myCoGroupFunction<Tuple3<String, String, DateTime>, Tuple2<String, > DateTime>, Tuple4<String, String, DateTime, DateTime>> (...)); > res.print(); > > in myCoGroupFunction I declare a Tuple4<String, String, DateTime, > DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3 = > null. > > Then I get this stackTrace: > > Caused by: org.apache.flink.types.NullKeyFieldException > at > org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76) > at > org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30) > at > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) > at > org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) > at > org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:745) > > On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier <[email protected]> > wrote: > >> Thanks a lot Robert! Don't mention it ;) >> >> >> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <[email protected]> >> wrote: >> >>> Hey, >>> >>> the patch is in my branch "flink2019". >>> Its really good that you've found the bug. We were using the wrong kryo >>> instance to create copies of generic types. >>> >>> Once travis validates that everything is good, I'll push it to master. >>> >>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier < >>> [email protected]> wrote: >>> >>>> So do you think you could release a path soon? I need it to continue my >>>> work..otherwise if it's very simple you could send me the snippet of code >>>> to change my local flink version ;) >>>> >>>> Best, >>>> Flavio >>>> >>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger <[email protected]> >>>> wrote: >>>> >>>>> Yes ;) >>>>> >>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier < >>>>> [email protected]> wrote: >>>>> >>>>>> Do you think it's comething easy to fix..? >>>>>> >>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger <[email protected] >>>>>> > wrote: >>>>>> >>>>>>> No problem ;) >>>>>>> >>>>>>> I was able to reproduce the issue and filed a JIRA for it: >>>>>>> https://issues.apache.org/jira/browse/FLINK-2019 >>>>>>> >>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Unfortunately it's really difficult for me to extract the code..I'm >>>>>>>> using joda shipped with Flink 0.9-SNAPSHOT (i.e. 2.5) and before today >>>>>>>> I've >>>>>>>> never seen this error..als o because DateTime is Serializable :) >>>>>>>> >>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Is there a chance that the version of JodaTime changed? >>>>>>>>> >>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <[email protected]>: >>>>>>>>> >>>>>>>>>> Can you share the Flink program? >>>>>>>>>> Or at least the definition of the Tuple? >>>>>>>>>> >>>>>>>>>> I'll look into this issue in a few minutes. >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio Pompermaier < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've never seen this error >>>>>>>>>>> before today (the job haven't changed..) >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert Metzger < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Flavio, >>>>>>>>>>>> >>>>>>>>>>>> which version of Flink are you using? >>>>>>>>>>>> If you are using 0.9 something, then this should actually work >>>>>>>>>>>> ;) >>>>>>>>>>>> >>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM, Flavio Pompermaier < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi to all, >>>>>>>>>>>>> >>>>>>>>>>>>> this morning I run my Flink job and I got the following >>>>>>>>>>>>> exception serializing a DateTime Tuple..could you help me to >>>>>>>>>>>>> understand >>>>>>>>>>>>> what's happening here? >>>>>>>>>>>>> >>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException: Class cannot be >>>>>>>>>>>>> created (missing no-arg constructor): >>>>>>>>>>>>> org.joda.time.chrono.ISOChronology >>>>>>>>>>>>> Serialization trace: >>>>>>>>>>>>> iChronology (org.joda.time.DateTime) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) >>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
