Hi, Yes, I've updated the PR. It needs a review and should be included in Flink 1.5.
Cheers, Fabian simone <simone.povosca...@gmail.com> schrieb am Mo., 26. März 2018, 12:01: > Hi Fabian, > > any update on this? Did you fix it? > > Best, Simone. > > On 22/03/2018 00:24, Fabian Hueske wrote: > > Hi, > > That was a bit too early. > I found an issue with my approach. Will come back once I solved that. > > Best, Fabian > > 2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi, >> >> I've opened a pull request [1] that should fix the problem. >> It would be great if you could try change and report back whether it >> fixes the problem. >> >> Thank you, >> Fabian >> >> [1] https://github.com/apache/flink/pull/5742 >> >> 2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>: >> >>> Hi all, >>> >>> an update: following Stephan directives on how to diagnose the issue, >>> making Person immutable, the problem does not occur. >>> >>> Simone. >>> >>> On 20/03/2018 20:20, Stephan Ewen wrote: >>> >>> To diagnose that, can you please check the following: >>> >>> - Change the Person data type to be immutable (final fields, no >>> setters, set fields in constructor instead). Does that make the problem go >>> away? >>> >>> - Change the Person data type to not be a POJO by adding a dummy >>> fields that is never used, but does not have a getter/setter. Does that >>> make the problem go away? >>> >>> If either of that is the case, it must be a mutability bug somewhere in >>> either accidental object reuse or accidental serializer sharing. >>> >>> >>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Simone and Flavio, >>>> >>>> I created FLINK-9031 [1] for this issue. >>>> Please have a look and add any detail that you think could help to >>>> resolve the problem. >>>> >>>> Thanks, >>>> Fabian >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-9031 >>>> >>>> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> This simple code reproduces the behavior -> >>>>> https://github.com/xseris/Flink-test-union >>>>> >>>>> Thanks, Simone. >>>>> >>>>> On 19/03/2018 15:44, Fabian Hueske wrote: >>>>> >>>>> Hmmm, I still don't see the problem. >>>>> IMO, the result should be correct for both plans. The data is >>>>> replicated, filtered, reduced, and unioned. >>>>> There is nothing in between the filter and reduce, that could cause >>>>> incorrect behavior. >>>>> >>>>> The good thing is, the optimizer seems to be fine. The bad thing is, >>>>> it is either the Flink runtime code or your functions. >>>>> Given that one plan produces good results, it might be the Flink >>>>> runtime code. >>>>> >>>>> Coming back to my previous question. >>>>> Can you provide a minimal program to reproduce the issue? >>>>> >>>>> Thanks, Fabian >>>>> >>>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>>> >>>>>> Ah, thanks for the update! >>>>>> I'll have a look at that. >>>>>> >>>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>>>> >>>>>>> HI Simone, >>>>>>> >>>>>>> Looking at the plan, I don't see why this should be happening. The >>>>>>> pseudo code looks fine as well. >>>>>>> Any chance that you can create a minimal program to reproduce the >>>>>>> problem? >>>>>>> >>>>>>> Thanks, >>>>>>> Fabian >>>>>>> >>>>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>: >>>>>>> >>>>>>>> Hi Fabian, >>>>>>>> >>>>>>>> reuse is not enabled. I attach the plan of the execution. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Simone >>>>>>>> >>>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Union is actually a very simple operator (not even an operator in >>>>>>>> Flink terms). It just merges to inputs. There is no additional logic >>>>>>>> involved. >>>>>>>> Therefore, it should also not emit records before either of both >>>>>>>> ReduceFunctions sorted its data. >>>>>>>> Once the data has been sorted for the ReduceFunction, the data is >>>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first >>>>>>>> record is >>>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned >>>>>>>> inputs). >>>>>>>> So it is not unexpected that Map starts processing before the >>>>>>>> ReduceFunction terminated. >>>>>>>> >>>>>>>> Did you enable object reuse [1]? >>>>>>>> If yes, try to disable it. If you want to reuse objects, you have >>>>>>>> to be careful in how you implement your functions. >>>>>>>> If no, can you share the plan >>>>>>>> (ExecutionEnvironment.getExecutionPlan()) that was generated for the >>>>>>>> program? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Fabian >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>>> : >>>>>>>> >>>>>>>>> Any help on this? This thing is very strange..the "manual" union >>>>>>>>> of the output of the 2 datasets is different than the flink-union of >>>>>>>>> them.. >>>>>>>>> Could it be a problem of the flink optimizer? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone < >>>>>>>>> simone.povosca...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is >>>>>>>>>> indeed an equals. >>>>>>>>>> >>>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Just a guest, but string compare in Java should be using equals >>>>>>>>>> method, not == operator. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Kien >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3/16/2018 9:47 PM, simone wrote: >>>>>>>>>> >>>>>>>>>> *subject.getField("field1") == "";* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> > >