Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve
the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com>:

> Hi Fabian,
>
> This simple code reproduces the behavior -> https://github.com/xseris/
> Flink-test-union
>
> Thanks, Simone.
>
> On 19/03/2018 15:44, Fabian Hueske wrote:
>
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is replicated,
> filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, it is
> either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink runtime
> code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Ah, thanks for the update!
>> I'll have a look at that.
>>
>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> HI Simone,
>>>
>>> Looking at the plan, I don't see why this should be happening. The
>>> pseudo code looks fine as well.
>>> Any chance that you can create a minimal program to reproduce the
>>> problem?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> reuse is not enabled. I attach the plan of the execution.
>>>>
>>>> Thanks,
>>>> Simone
>>>>
>>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>>
>>>> Hi,
>>>>
>>>> Union is actually a very simple operator (not even an operator in Flink
>>>> terms). It just merges to inputs. There is no additional logic involved.
>>>> Therefore, it should also not emit records before either of both
>>>> ReduceFunctions sorted its data.
>>>> Once the data has been sorted for the ReduceFunction, the data is
>>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>>> So it is not unexpected that Map starts processing before the
>>>> ReduceFunction terminated.
>>>>
>>>> Did you enable object reuse [1]?
>>>> If yes, try to disable it. If you want to reuse objects, you have to be
>>>> careful in how you implement your functions.
>>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>>> that was generated for the program?
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>>
>>>>
>>>>
>>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>
>>>>> Any help on this? This thing is very strange..the "manual" union of
>>>>> the output of the 2 datasets is different than the flink-union of them..
>>>>> Could it be a problem of the flink optimizer?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Fri, Mar 16, 2018 at 4:01 PM, simone <simone.povosca...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry, I translated the code into pseudocode too fast. That is indeed
>>>>>> an equals.
>>>>>>
>>>>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Just a guest, but string compare in Java should be using equals
>>>>>> method, not == operator.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Kien
>>>>>>
>>>>>>
>>>>>> On 3/16/2018 9:47 PM, simone wrote:
>>>>>>
>>>>>> *subject.getField("field1") == "";*
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>
>

Reply via email to