Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes
the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>:

> Hi all,
>
> an update: following Stephan directives on how to diagnose the issue,
> making Person immutable, the problem does not occur.
>
> Simone.
>
> On 20/03/2018 20:20, Stephan Ewen wrote:
>
> To diagnose that, can you please check the following:
>
>   - Change the Person data type to be immutable (final fields, no setters,
> set fields in constructor instead). Does that make the problem go away?
>
>   - Change the Person data type to not be a POJO by adding a dummy fields
> that is never used, but does not have a getter/setter. Does that make the
> problem go away?
>
> If either of that is the case, it must be a mutability bug somewhere in
> either accidental object reuse or accidental serializer sharing.
>
>
> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Simone and Flavio,
>>
>> I created FLINK-9031 [1] for this issue.
>> Please have a look and add any detail that you think could help to
>> resolve the problem.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>
>> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> This simple code reproduces the behavior ->
>>> https://github.com/xseris/Flink-test-union
>>>
>>> Thanks, Simone.
>>>
>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>
>>> Hmmm, I still don't see the problem.
>>> IMO, the result should be correct for both plans. The data is
>>> replicated, filtered, reduced, and unioned.
>>> There is nothing in between the filter and reduce, that could cause
>>> incorrect behavior.
>>>
>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>> is either the Flink runtime code or your functions.
>>> Given that one plan produces good results, it might be the Flink runtime
>>> code.
>>>
>>> Coming back to my previous question.
>>> Can you provide a minimal program to reproduce the issue?
>>>
>>> Thanks, Fabian
>>>
>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> Ah, thanks for the update!
>>>> I'll have a look at that.
>>>>
>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> HI Simone,
>>>>>
>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>> pseudo code looks fine as well.
>>>>> Any chance that you can create a minimal program to reproduce the
>>>>> problem?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>
>>>>>> Thanks,
>>>>>> Simone
>>>>>>
>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>> involved.
>>>>>> Therefore, it should also not emit records before either of both
>>>>>> ReduceFunctions sorted its data.
>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>>>>>> is
>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>>>>> inputs).
>>>>>> So it is not unexpected that Map starts processing before the
>>>>>> ReduceFunction terminated.
>>>>>>
>>>>>> Did you enable object reuse [1]?
>>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>>> be careful in how you implement your functions.
>>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>>> that was generated for the program?
>>>>>>
>>>>>> Thanks,
>>>>>> Fabian
>>>>>>
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povosca...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>> indeed an equals.
>>>>>>>>
>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>> method, not == operator.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Kien
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>
>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to