Hi,
Yes, I've updated the PR.
It needs a review and should be included in Flink 1.5.

Cheers, Fabian

simone <simone.povosca...@gmail.com> schrieb am Mo., 26. März 2018, 12:01:

> Hi Fabian,
>
> any update on this? Did you fix it?
>
> Best, Simone.
>
> On 22/03/2018 00:24, Fabian Hueske wrote:
>
> Hi,
>
> That was a bit too early.
> I found an issue with my approach. Will come back once I solved that.
>
> Best, Fabian
>
> 2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi,
>>
>> I've opened a pull request [1] that should fix the problem.
>> It would be great if you could try change and report back whether it
>> fixes the problem.
>>
>> Thank you,
>> Fabian
>>
>> [1] https://github.com/apache/flink/pull/5742
>>
>> 2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>
>>> Hi all,
>>>
>>> an update: following Stephan directives on how to diagnose the issue,
>>> making Person immutable, the problem does not occur.
>>>
>>> Simone.
>>>
>>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>>
>>> To diagnose that, can you please check the following:
>>>
>>>   - Change the Person data type to be immutable (final fields, no
>>> setters, set fields in constructor instead). Does that make the problem go
>>> away?
>>>
>>>   - Change the Person data type to not be a POJO by adding a dummy
>>> fields that is never used, but does not have a getter/setter. Does that
>>> make the problem go away?
>>>
>>> If either of that is the case, it must be a mutability bug somewhere in
>>> either accidental object reuse or accidental serializer sharing.
>>>
>>>
>>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Simone and Flavio,
>>>>
>>>> I created FLINK-9031 [1] for this issue.
>>>> Please have a look and add any detail that you think could help to
>>>> resolve the problem.
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>>
>>>> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> This simple code reproduces the behavior ->
>>>>> https://github.com/xseris/Flink-test-union
>>>>>
>>>>> Thanks, Simone.
>>>>>
>>>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>>>
>>>>> Hmmm, I still don't see the problem.
>>>>> IMO, the result should be correct for both plans. The data is
>>>>> replicated, filtered, reduced, and unioned.
>>>>> There is nothing in between the filter and reduce, that could cause
>>>>> incorrect behavior.
>>>>>
>>>>> The good thing is, the optimizer seems to be fine. The bad thing is,
>>>>> it is either the Flink runtime code or your functions.
>>>>> Given that one plan produces good results, it might be the Flink
>>>>> runtime code.
>>>>>
>>>>> Coming back to my previous question.
>>>>> Can you provide a minimal program to reproduce the issue?
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>>
>>>>>> Ah, thanks for the update!
>>>>>> I'll have a look at that.
>>>>>>
>>>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>>>
>>>>>>> HI Simone,
>>>>>>>
>>>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>>>> pseudo code looks fine as well.
>>>>>>> Any chance that you can create a minimal program to reproduce the
>>>>>>> problem?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Fabian,
>>>>>>>>
>>>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Simone
>>>>>>>>
>>>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>>>> involved.
>>>>>>>> Therefore, it should also not emit records before either of both
>>>>>>>> ReduceFunctions sorted its data.
>>>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first 
>>>>>>>> record is
>>>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>>>>>>> inputs).
>>>>>>>> So it is not unexpected that Map starts processing before the
>>>>>>>> ReduceFunction terminated.
>>>>>>>>
>>>>>>>> Did you enable object reuse [1]?
>>>>>>>> If yes, try to disable it. If you want to reuse objects, you have
>>>>>>>> to be careful in how you implement your functions.
>>>>>>>> If no, can you share the plan
>>>>>>>> (ExecutionEnvironment.getExecutionPlan()) that was generated for the
>>>>>>>> program?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>>> :
>>>>>>>>
>>>>>>>>> Any help on this? This thing is very strange..the "manual" union
>>>>>>>>> of the output of the 2 datasets is different than the flink-union of 
>>>>>>>>> them..
>>>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <
>>>>>>>>> simone.povosca...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>>>> indeed an equals.
>>>>>>>>>>
>>>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>>>> method, not == operator.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Kien
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>>>
>>>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to