Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi,
>
> I've opened a pull request [1] that should fix the problem.
> It would be great if you could try change and report back whether it fixes
> the problem.
>
> Thank you,
> Fabian
>
> [1] https://github.com/apache/flink/pull/5742
>
> 2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com>:
>
>> Hi all,
>>
>> an update: following Stephan directives on how to diagnose the issue,
>> making Person immutable, the problem does not occur.
>>
>> Simone.
>>
>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>
>> To diagnose that, can you please check the following:
>>
>>   - Change the Person data type to be immutable (final fields, no
>> setters, set fields in constructor instead). Does that make the problem go
>> away?
>>
>>   - Change the Person data type to not be a POJO by adding a dummy fields
>> that is never used, but does not have a getter/setter. Does that make
>> the problem go away?
>>
>> If either of that is the case, it must be a mutability bug somewhere in
>> either accidental object reuse or accidental serializer sharing.
>>
>>
>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Simone and Flavio,
>>>
>>> I created FLINK-9031 [1] for this issue.
>>> Please have a look and add any detail that you think could help to
>>> resolve the problem.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>
>>> 2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> This simple code reproduces the behavior ->
>>>> https://github.com/xseris/Flink-test-union
>>>>
>>>> Thanks, Simone.
>>>>
>>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>>
>>>> Hmmm, I still don't see the problem.
>>>> IMO, the result should be correct for both plans. The data is
>>>> replicated, filtered, reduced, and unioned.
>>>> There is nothing in between the filter and reduce, that could cause
>>>> incorrect behavior.
>>>>
>>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>>> is either the Flink runtime code or your functions.
>>>> Given that one plan produces good results, it might be the Flink
>>>> runtime code.
>>>>
>>>> Coming back to my previous question.
>>>> Can you provide a minimal program to reproduce the issue?
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Ah, thanks for the update!
>>>>> I'll have a look at that.
>>>>>
>>>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>>
>>>>>> HI Simone,
>>>>>>
>>>>>> Looking at the plan, I don't see why this should be happening. The
>>>>>> pseudo code looks fine as well.
>>>>>> Any chance that you can create a minimal program to reproduce the
>>>>>> problem?
>>>>>>
>>>>>> Thanks,
>>>>>> Fabian
>>>>>>
>>>>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> reuse is not enabled. I attach the plan of the execution.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simone
>>>>>>>
>>>>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Union is actually a very simple operator (not even an operator in
>>>>>>> Flink terms). It just merges to inputs. There is no additional logic
>>>>>>> involved.
>>>>>>> Therefore, it should also not emit records before either of both
>>>>>>> ReduceFunctions sorted its data.
>>>>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>>>>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>>>>>>> is
>>>>>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>>>>>> inputs).
>>>>>>> So it is not unexpected that Map starts processing before the
>>>>>>> ReduceFunction terminated.
>>>>>>>
>>>>>>> Did you enable object reuse [1]?
>>>>>>> If yes, try to disable it. If you want to reuse objects, you have to
>>>>>>> be careful in how you implement your functions.
>>>>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>>>>> that was generated for the program?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Fabian
>>>>>>>
>>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>>
>>>>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>>>>> the output of the 2 datasets is different than the flink-union of 
>>>>>>>> them..
>>>>>>>> Could it be a problem of the flink optimizer?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <
>>>>>>>> simone.povosca...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sorry, I translated the code into pseudocode too fast. That is
>>>>>>>>> indeed an equals.
>>>>>>>>>
>>>>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>>>>> method, not == operator.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Kien
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>>>>
>>>>>>>>> *subject.getField("field1") == "";*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to