Hi, That was a bit too early. I found an issue with my approach. Will come back once I solved that.
Best, Fabian 2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Hi, > > I've opened a pull request [1] that should fix the problem. > It would be great if you could try change and report back whether it fixes > the problem. > > Thank you, > Fabian > > [1] https://github.com/apache/flink/pull/5742 > > 2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>: > >> Hi all, >> >> an update: following Stephan directives on how to diagnose the issue, >> making Person immutable, the problem does not occur. >> >> Simone. >> >> On 20/03/2018 20:20, Stephan Ewen wrote: >> >> To diagnose that, can you please check the following: >> >> - Change the Person data type to be immutable (final fields, no >> setters, set fields in constructor instead). Does that make the problem go >> away? >> >> - Change the Person data type to not be a POJO by adding a dummy fields >> that is never used, but does not have a getter/setter. Does that make >> the problem go away? >> >> If either of that is the case, it must be a mutability bug somewhere in >> either accidental object reuse or accidental serializer sharing. >> >> >> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Simone and Flavio, >>> >>> I created FLINK-9031 [1] for this issue. >>> Please have a look and add any detail that you think could help to >>> resolve the problem. >>> >>> Thanks, >>> Fabian >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-9031 >>> >>> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>: >>> >>>> Hi Fabian, >>>> >>>> This simple code reproduces the behavior -> >>>> https://github.com/xseris/Flink-test-union >>>> >>>> Thanks, Simone. >>>> >>>> On 19/03/2018 15:44, Fabian Hueske wrote: >>>> >>>> Hmmm, I still don't see the problem. >>>> IMO, the result should be correct for both plans. The data is >>>> replicated, filtered, reduced, and unioned. >>>> There is nothing in between the filter and reduce, that could cause >>>> incorrect behavior. >>>> >>>> The good thing is, the optimizer seems to be fine. The bad thing is, it >>>> is either the Flink runtime code or your functions. >>>> Given that one plan produces good results, it might be the Flink >>>> runtime code. >>>> >>>> Coming back to my previous question. >>>> Can you provide a minimal program to reproduce the issue? >>>> >>>> Thanks, Fabian >>>> >>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> Ah, thanks for the update! >>>>> I'll have a look at that. >>>>> >>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>>> >>>>>> HI Simone, >>>>>> >>>>>> Looking at the plan, I don't see why this should be happening. The >>>>>> pseudo code looks fine as well. >>>>>> Any chance that you can create a minimal program to reproduce the >>>>>> problem? >>>>>> >>>>>> Thanks, >>>>>> Fabian >>>>>> >>>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>: >>>>>> >>>>>>> Hi Fabian, >>>>>>> >>>>>>> reuse is not enabled. I attach the plan of the execution. >>>>>>> >>>>>>> Thanks, >>>>>>> Simone >>>>>>> >>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Union is actually a very simple operator (not even an operator in >>>>>>> Flink terms). It just merges to inputs. There is no additional logic >>>>>>> involved. >>>>>>> Therefore, it should also not emit records before either of both >>>>>>> ReduceFunctions sorted its data. >>>>>>> Once the data has been sorted for the ReduceFunction, the data is >>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record >>>>>>> is >>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned >>>>>>> inputs). >>>>>>> So it is not unexpected that Map starts processing before the >>>>>>> ReduceFunction terminated. >>>>>>> >>>>>>> Did you enable object reuse [1]? >>>>>>> If yes, try to disable it. If you want to reuse objects, you have to >>>>>>> be careful in how you implement your functions. >>>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) >>>>>>> that was generated for the program? >>>>>>> >>>>>>> Thanks, >>>>>>> Fabian >>>>>>> >>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>>> dev/batch/index.html#operating-on-data-objects-in-functions >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>>> >>>>>>>> Any help on this? This thing is very strange..the "manual" union of >>>>>>>> the output of the 2 datasets is different than the flink-union of >>>>>>>> them.. >>>>>>>> Could it be a problem of the flink optimizer? >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone < >>>>>>>> simone.povosca...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is >>>>>>>>> indeed an equals. >>>>>>>>> >>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Just a guest, but string compare in Java should be using equals >>>>>>>>> method, not == operator. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Kien >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3/16/2018 9:47 PM, simone wrote: >>>>>>>>> >>>>>>>>> *subject.getField("field1") == "";* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >