Hi, I've opened a pull request [1] that should fix the problem. It would be great if you could try change and report back whether it fixes the problem.
Thank you, Fabian [1] https://github.com/apache/flink/pull/5742 2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>: > Hi all, > > an update: following Stephan directives on how to diagnose the issue, > making Person immutable, the problem does not occur. > > Simone. > > On 20/03/2018 20:20, Stephan Ewen wrote: > > To diagnose that, can you please check the following: > > - Change the Person data type to be immutable (final fields, no setters, > set fields in constructor instead). Does that make the problem go away? > > - Change the Person data type to not be a POJO by adding a dummy fields > that is never used, but does not have a getter/setter. Does that make the > problem go away? > > If either of that is the case, it must be a mutability bug somewhere in > either accidental object reuse or accidental serializer sharing. > > > On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Simone and Flavio, >> >> I created FLINK-9031 [1] for this issue. >> Please have a look and add any detail that you think could help to >> resolve the problem. >> >> Thanks, >> Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-9031 >> >> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>: >> >>> Hi Fabian, >>> >>> This simple code reproduces the behavior -> >>> https://github.com/xseris/Flink-test-union >>> >>> Thanks, Simone. >>> >>> On 19/03/2018 15:44, Fabian Hueske wrote: >>> >>> Hmmm, I still don't see the problem. >>> IMO, the result should be correct for both plans. The data is >>> replicated, filtered, reduced, and unioned. >>> There is nothing in between the filter and reduce, that could cause >>> incorrect behavior. >>> >>> The good thing is, the optimizer seems to be fine. The bad thing is, it >>> is either the Flink runtime code or your functions. >>> Given that one plan produces good results, it might be the Flink runtime >>> code. >>> >>> Coming back to my previous question. >>> Can you provide a minimal program to reproduce the issue? >>> >>> Thanks, Fabian >>> >>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> Ah, thanks for the update! >>>> I'll have a look at that. >>>> >>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> HI Simone, >>>>> >>>>> Looking at the plan, I don't see why this should be happening. The >>>>> pseudo code looks fine as well. >>>>> Any chance that you can create a minimal program to reproduce the >>>>> problem? >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>: >>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> reuse is not enabled. I attach the plan of the execution. >>>>>> >>>>>> Thanks, >>>>>> Simone >>>>>> >>>>>> On 19/03/2018 11:36, Fabian Hueske wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> Union is actually a very simple operator (not even an operator in >>>>>> Flink terms). It just merges to inputs. There is no additional logic >>>>>> involved. >>>>>> Therefore, it should also not emit records before either of both >>>>>> ReduceFunctions sorted its data. >>>>>> Once the data has been sorted for the ReduceFunction, the data is >>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record >>>>>> is >>>>>> reduced, it is forwarded into the MapFunction (passing the unioned >>>>>> inputs). >>>>>> So it is not unexpected that Map starts processing before the >>>>>> ReduceFunction terminated. >>>>>> >>>>>> Did you enable object reuse [1]? >>>>>> If yes, try to disable it. If you want to reuse objects, you have to >>>>>> be careful in how you implement your functions. >>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan()) >>>>>> that was generated for the program? >>>>>> >>>>>> Thanks, >>>>>> Fabian >>>>>> >>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>> dev/batch/index.html#operating-on-data-objects-in-functions >>>>>> >>>>>> >>>>>> >>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> Any help on this? This thing is very strange..the "manual" union of >>>>>>> the output of the 2 datasets is different than the flink-union of them.. >>>>>>> Could it be a problem of the flink optimizer? >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povosca...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Sorry, I translated the code into pseudocode too fast. That is >>>>>>>> indeed an equals. >>>>>>>> >>>>>>>> On 16/03/2018 15:58, Kien Truong wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Just a guest, but string compare in Java should be using equals >>>>>>>> method, not == operator. >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Kien >>>>>>>> >>>>>>>> >>>>>>>> On 3/16/2018 9:47 PM, simone wrote: >>>>>>>> >>>>>>>> *subject.getField("field1") == "";* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >> > >