Re: Strange behavior on filter, group and reduce DataSets

2018-03-26 Thread simone

Hi Fabian,

any update on this? Did you fix it?

Best, Simone.


On 22/03/2018 00:24, Fabian Hueske wrote:

Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske >:


Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether
it fixes the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742


2018-03-21 9:49 GMT+01:00 simone >:

Hi all,

an update: following Stephan directives on how to diagnose the
issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:

To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final
fields, no setters, set fields in constructor instead). Does
that make the problem go away?

  - Change the Person data type to not be a POJO by adding a
dummy fields that is never used, but does not have a
getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug
somewhere in either accidental object reuse or accidental
serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske
> wrote:

Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think
could help to resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031


2018-03-19 16:35 GMT+01:00 simone
>:

Hi Fabian,

This simple code reproduces the behavior ->
https://github.com/xseris/Flink-test-union


Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:

Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans.
The data is replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce,
that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine.
The bad thing is, it is either the Flink runtime
code or your functions.
Given that one plan produces good results, it might
be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the
issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske
>:

Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske
>:

HI Simone,

Looking at the plan, I don't see why this
should be happening. The pseudo code looks
fine as well.
Any chance that you can create a minimal
program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone
>:

Hi Fabian,

reuse is not enabled. I attach the plan
of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:

Hi,

Union is actually a very simple
operator (not even an operator in Flink
terms). It just merges to inputs. There
is no additional logic involved.
Therefore, it should also not emit
records before either of both
ReduceFunctions sorted its data.
Once the data has been sorted for the
 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-26 Thread Fabian Hueske
Hi,
Yes, I've updated the PR.
It needs a review and should be included in Flink 1.5.

Cheers, Fabian

simone  schrieb am Mo., 26. März 2018, 12:01:

> Hi Fabian,
>
> any update on this? Did you fix it?
>
> Best, Simone.
>
> On 22/03/2018 00:24, Fabian Hueske wrote:
>
> Hi,
>
> That was a bit too early.
> I found an issue with my approach. Will come back once I solved that.
>
> Best, Fabian
>
> 2018-03-21 23:45 GMT+01:00 Fabian Hueske :
>
>> Hi,
>>
>> I've opened a pull request [1] that should fix the problem.
>> It would be great if you could try change and report back whether it
>> fixes the problem.
>>
>> Thank you,
>> Fabian
>>
>> [1] https://github.com/apache/flink/pull/5742
>>
>> 2018-03-21 9:49 GMT+01:00 simone :
>>
>>> Hi all,
>>>
>>> an update: following Stephan directives on how to diagnose the issue,
>>> making Person immutable, the problem does not occur.
>>>
>>> Simone.
>>>
>>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>>
>>> To diagnose that, can you please check the following:
>>>
>>>   - Change the Person data type to be immutable (final fields, no
>>> setters, set fields in constructor instead). Does that make the problem go
>>> away?
>>>
>>>   - Change the Person data type to not be a POJO by adding a dummy
>>> fields that is never used, but does not have a getter/setter. Does that
>>> make the problem go away?
>>>
>>> If either of that is the case, it must be a mutability bug somewhere in
>>> either accidental object reuse or accidental serializer sharing.
>>>
>>>
>>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Simone and Flavio,

 I created FLINK-9031 [1] for this issue.
 Please have a look and add any detail that you think could help to
 resolve the problem.

 Thanks,
 Fabian

 [1] https://issues.apache.org/jira/browse/FLINK-9031

 2018-03-19 16:35 GMT+01:00 simone :

> Hi Fabian,
>
> This simple code reproduces the behavior ->
> https://github.com/xseris/Flink-test-union
>
> Thanks, Simone.
>
> On 19/03/2018 15:44, Fabian Hueske wrote:
>
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is
> replicated, filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is,
> it is either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink
> runtime code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>
>> Ah, thanks for the update!
>> I'll have a look at that.
>>
>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>>
>>> HI Simone,
>>>
>>> Looking at the plan, I don't see why this should be happening. The
>>> pseudo code looks fine as well.
>>> Any chance that you can create a minimal program to reproduce the
>>> problem?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-03-19 12:04 GMT+01:00 simone :
>>>
 Hi Fabian,

 reuse is not enabled. I attach the plan of the execution.

 Thanks,
 Simone

 On 19/03/2018 11:36, Fabian Hueske wrote:

 Hi,

 Union is actually a very simple operator (not even an operator in
 Flink terms). It just merges to inputs. There is no additional logic
 involved.
 Therefore, it should also not emit records before either of both
 ReduceFunctions sorted its data.
 Once the data has been sorted for the ReduceFunction, the data is
 reduced and emitted in a pipelined fashion, i.e., once the first 
 record is
 reduced, it is forwarded into the MapFunction (passing the unioned 
 inputs).
 So it is not unexpected that Map starts processing before the
 ReduceFunction terminated.

 Did you enable object reuse [1]?
 If yes, try to disable it. If you want to reuse objects, you have
 to be careful in how you implement your functions.
 If no, can you share the plan
 (ExecutionEnvironment.getExecutionPlan()) that was generated for the
 program?

 Thanks,
 Fabian

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier 
 :

> Any help 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread Fabian Hueske
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske :

> Hi,
>
> I've opened a pull request [1] that should fix the problem.
> It would be great if you could try change and report back whether it fixes
> the problem.
>
> Thank you,
> Fabian
>
> [1] https://github.com/apache/flink/pull/5742
>
> 2018-03-21 9:49 GMT+01:00 simone :
>
>> Hi all,
>>
>> an update: following Stephan directives on how to diagnose the issue,
>> making Person immutable, the problem does not occur.
>>
>> Simone.
>>
>> On 20/03/2018 20:20, Stephan Ewen wrote:
>>
>> To diagnose that, can you please check the following:
>>
>>   - Change the Person data type to be immutable (final fields, no
>> setters, set fields in constructor instead). Does that make the problem go
>> away?
>>
>>   - Change the Person data type to not be a POJO by adding a dummy fields
>> that is never used, but does not have a getter/setter. Does that make
>> the problem go away?
>>
>> If either of that is the case, it must be a mutability bug somewhere in
>> either accidental object reuse or accidental serializer sharing.
>>
>>
>> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:
>>
>>> Hi Simone and Flavio,
>>>
>>> I created FLINK-9031 [1] for this issue.
>>> Please have a look and add any detail that you think could help to
>>> resolve the problem.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>>
>>> 2018-03-19 16:35 GMT+01:00 simone :
>>>
 Hi Fabian,

 This simple code reproduces the behavior ->
 https://github.com/xseris/Flink-test-union

 Thanks, Simone.

 On 19/03/2018 15:44, Fabian Hueske wrote:

 Hmmm, I still don't see the problem.
 IMO, the result should be correct for both plans. The data is
 replicated, filtered, reduced, and unioned.
 There is nothing in between the filter and reduce, that could cause
 incorrect behavior.

 The good thing is, the optimizer seems to be fine. The bad thing is, it
 is either the Flink runtime code or your functions.
 Given that one plan produces good results, it might be the Flink
 runtime code.

 Coming back to my previous question.
 Can you provide a minimal program to reproduce the issue?

 Thanks, Fabian

 2018-03-19 15:15 GMT+01:00 Fabian Hueske :

> Ah, thanks for the update!
> I'll have a look at that.
>
> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>
>> HI Simone,
>>
>> Looking at the plan, I don't see why this should be happening. The
>> pseudo code looks fine as well.
>> Any chance that you can create a minimal program to reproduce the
>> problem?
>>
>> Thanks,
>> Fabian
>>
>> 2018-03-19 12:04 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> reuse is not enabled. I attach the plan of the execution.
>>>
>>> Thanks,
>>> Simone
>>>
>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>
>>> Hi,
>>>
>>> Union is actually a very simple operator (not even an operator in
>>> Flink terms). It just merges to inputs. There is no additional logic
>>> involved.
>>> Therefore, it should also not emit records before either of both
>>> ReduceFunctions sorted its data.
>>> Once the data has been sorted for the ReduceFunction, the data is
>>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>>> is
>>> reduced, it is forwarded into the MapFunction (passing the unioned 
>>> inputs).
>>> So it is not unexpected that Map starts processing before the
>>> ReduceFunction terminated.
>>>
>>> Did you enable object reuse [1]?
>>> If yes, try to disable it. If you want to reuse objects, you have to
>>> be careful in how you implement your functions.
>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>> that was generated for the program?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>
>>>
>>>
>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>>
 Any help on this? This thing is very strange..the "manual" union of
 the output of the 2 datasets is different than the flink-union of 
 them..
 Could it be a problem of the flink optimizer?

 Best,
 Flavio

 On Fri, Mar 16, 2018 at 4:01 PM, simone <
 simone.povosca...@gmail.com> wrote:

> Sorry, I translated the code into pseudocode too fast. That is
> indeed an equals.

Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread Fabian Hueske
Hi,

I've opened a pull request [1] that should fix the problem.
It would be great if you could try change and report back whether it fixes
the problem.

Thank you,
Fabian

[1] https://github.com/apache/flink/pull/5742

2018-03-21 9:49 GMT+01:00 simone :

> Hi all,
>
> an update: following Stephan directives on how to diagnose the issue,
> making Person immutable, the problem does not occur.
>
> Simone.
>
> On 20/03/2018 20:20, Stephan Ewen wrote:
>
> To diagnose that, can you please check the following:
>
>   - Change the Person data type to be immutable (final fields, no setters,
> set fields in constructor instead). Does that make the problem go away?
>
>   - Change the Person data type to not be a POJO by adding a dummy fields
> that is never used, but does not have a getter/setter. Does that make the
> problem go away?
>
> If either of that is the case, it must be a mutability bug somewhere in
> either accidental object reuse or accidental serializer sharing.
>
>
> On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:
>
>> Hi Simone and Flavio,
>>
>> I created FLINK-9031 [1] for this issue.
>> Please have a look and add any detail that you think could help to
>> resolve the problem.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9031
>>
>> 2018-03-19 16:35 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> This simple code reproduces the behavior ->
>>> https://github.com/xseris/Flink-test-union
>>>
>>> Thanks, Simone.
>>>
>>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>>
>>> Hmmm, I still don't see the problem.
>>> IMO, the result should be correct for both plans. The data is
>>> replicated, filtered, reduced, and unioned.
>>> There is nothing in between the filter and reduce, that could cause
>>> incorrect behavior.
>>>
>>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>>> is either the Flink runtime code or your functions.
>>> Given that one plan produces good results, it might be the Flink runtime
>>> code.
>>>
>>> Coming back to my previous question.
>>> Can you provide a minimal program to reproduce the issue?
>>>
>>> Thanks, Fabian
>>>
>>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>>>
 Ah, thanks for the update!
 I'll have a look at that.

 2018-03-19 15:13 GMT+01:00 Fabian Hueske :

> HI Simone,
>
> Looking at the plan, I don't see why this should be happening. The
> pseudo code looks fine as well.
> Any chance that you can create a minimal program to reproduce the
> problem?
>
> Thanks,
> Fabian
>
> 2018-03-19 12:04 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> reuse is not enabled. I attach the plan of the execution.
>>
>> Thanks,
>> Simone
>>
>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in
>> Flink terms). It just merges to inputs. There is no additional logic
>> involved.
>> Therefore, it should also not emit records before either of both
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is
>> reduced and emitted in a pipelined fashion, i.e., once the first record 
>> is
>> reduced, it is forwarded into the MapFunction (passing the unioned 
>> inputs).
>> So it is not unexpected that Map starts processing before the
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to
>> be careful in how you implement your functions.
>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>> that was generated for the program?
>>
>> Thanks,
>> Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>
>>> Any help on this? This thing is very strange..the "manual" union of
>>> the output of the 2 datasets is different than the flink-union of them..
>>> Could it be a problem of the flink optimizer?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Mar 16, 2018 at 4:01 PM, simone >> > wrote:
>>>
 Sorry, I translated the code into pseudocode too fast. That is
 indeed an equals.

 On 16/03/2018 15:58, Kien Truong wrote:

 Hi,

 Just a guest, but string compare in Java should be using equals
 method, not == operator.

 Regards,

 Kien


 On 3/16/2018 9:47 PM, simone wrote:

 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-21 Thread simone

Hi all,

an update: following Stephan directives on how to diagnose the issue, 
making Person immutable, the problem does not occur.


Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:

To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no 
setters, set fields in constructor instead). Does that make the 
problem go away?


  - Change the Person data type to not be a POJO by adding a dummy 
fields that is never used, but does not have a getter/setter. Does 
that make the problem go away?


If either of that is the case, it must be a mutability bug somewhere 
in either accidental object reuse or accidental serializer sharing.



On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske > wrote:


Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to
resolve the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031


2018-03-19 16:35 GMT+01:00 simone >:

Hi Fabian,

This simple code reproduces the behavior ->
https://github.com/xseris/Flink-test-union


Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:

Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is
replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could
cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad
thing is, it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the
Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske >:

Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske
>:

HI Simone,

Looking at the plan, I don't see why this should be
happening. The pseudo code looks fine as well.
Any chance that you can create a minimal program to
reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone
>:

Hi Fabian,

reuse is not enabled. I attach the plan of the
execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:

Hi,

Union is actually a very simple operator (not
even an operator in Flink terms). It just merges
to inputs. There is no additional logic involved.
Therefore, it should also not emit records
before either of both ReduceFunctions sorted its
data.
Once the data has been sorted for the
ReduceFunction, the data is reduced and emitted
in a pipelined fashion, i.e., once the first
record is reduced, it is forwarded into the
MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts
processing before the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse
objects, you have to be careful in how you
implement your functions.
If no, can you share the plan
(ExecutionEnvironment.getExecutionPlan()) that
was generated for the program?

Thanks,
Fabian

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions





2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>:

Any help on this? This thing is very
strange..the 

Re: Strange behavior on filter, group and reduce DataSets

2018-03-20 Thread Stephan Ewen
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters,
set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields
that is never used, but does not have a getter/setter. Does that make the
problem go away?

If either of that is the case, it must be a mutability bug somewhere in
either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske  wrote:

> Hi Simone and Flavio,
>
> I created FLINK-9031 [1] for this issue.
> Please have a look and add any detail that you think could help to resolve
> the problem.
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9031
>
> 2018-03-19 16:35 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> This simple code reproduces the behavior ->
>> https://github.com/xseris/Flink-test-union
>>
>> Thanks, Simone.
>>
>> On 19/03/2018 15:44, Fabian Hueske wrote:
>>
>> Hmmm, I still don't see the problem.
>> IMO, the result should be correct for both plans. The data is replicated,
>> filtered, reduced, and unioned.
>> There is nothing in between the filter and reduce, that could cause
>> incorrect behavior.
>>
>> The good thing is, the optimizer seems to be fine. The bad thing is, it
>> is either the Flink runtime code or your functions.
>> Given that one plan produces good results, it might be the Flink runtime
>> code.
>>
>> Coming back to my previous question.
>> Can you provide a minimal program to reproduce the issue?
>>
>> Thanks, Fabian
>>
>> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>>
>>> Ah, thanks for the update!
>>> I'll have a look at that.
>>>
>>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>>>
 HI Simone,

 Looking at the plan, I don't see why this should be happening. The
 pseudo code looks fine as well.
 Any chance that you can create a minimal program to reproduce the
 problem?

 Thanks,
 Fabian

 2018-03-19 12:04 GMT+01:00 simone :

> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>
> Hi,
>
> Union is actually a very simple operator (not even an operator in
> Flink terms). It just merges to inputs. There is no additional logic
> involved.
> Therefore, it should also not emit records before either of both
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is
> reduced and emitted in a pipelined fashion, i.e., once the first record is
> reduced, it is forwarded into the MapFunction (passing the unioned 
> inputs).
> So it is not unexpected that Map starts processing before the
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to
> be careful in how you implement your functions.
> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
> that was generated for the program?
>
> Thanks,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#operating-on-data-objects-in-functions
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>
>> Any help on this? This thing is very strange..the "manual" union of
>> the output of the 2 datasets is different than the flink-union of them..
>> Could it be a problem of the flink optimizer?
>>
>> Best,
>> Flavio
>>
>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>> wrote:
>>
>>> Sorry, I translated the code into pseudocode too fast. That is
>>> indeed an equals.
>>>
>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>> Hi,
>>>
>>> Just a guest, but string compare in Java should be using equals
>>> method, not == operator.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>>
>>> On 3/16/2018 9:47 PM, simone wrote:
>>>
>>> *subject.getField("field1") == "";*
>>>
>>>
>>>
>>
>>
>
>

>>>
>>
>>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-20 Thread Fabian Hueske
Hi Simone and Flavio,

I created FLINK-9031 [1] for this issue.
Please have a look and add any detail that you think could help to resolve
the problem.

Thanks,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9031

2018-03-19 16:35 GMT+01:00 simone :

> Hi Fabian,
>
> This simple code reproduces the behavior -> https://github.com/xseris/
> Flink-test-union
>
> Thanks, Simone.
>
> On 19/03/2018 15:44, Fabian Hueske wrote:
>
> Hmmm, I still don't see the problem.
> IMO, the result should be correct for both plans. The data is replicated,
> filtered, reduced, and unioned.
> There is nothing in between the filter and reduce, that could cause
> incorrect behavior.
>
> The good thing is, the optimizer seems to be fine. The bad thing is, it is
> either the Flink runtime code or your functions.
> Given that one plan produces good results, it might be the Flink runtime
> code.
>
> Coming back to my previous question.
> Can you provide a minimal program to reproduce the issue?
>
> Thanks, Fabian
>
> 2018-03-19 15:15 GMT+01:00 Fabian Hueske :
>
>> Ah, thanks for the update!
>> I'll have a look at that.
>>
>> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>>
>>> HI Simone,
>>>
>>> Looking at the plan, I don't see why this should be happening. The
>>> pseudo code looks fine as well.
>>> Any chance that you can create a minimal program to reproduce the
>>> problem?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-03-19 12:04 GMT+01:00 simone :
>>>
 Hi Fabian,

 reuse is not enabled. I attach the plan of the execution.

 Thanks,
 Simone

 On 19/03/2018 11:36, Fabian Hueske wrote:

 Hi,

 Union is actually a very simple operator (not even an operator in Flink
 terms). It just merges to inputs. There is no additional logic involved.
 Therefore, it should also not emit records before either of both
 ReduceFunctions sorted its data.
 Once the data has been sorted for the ReduceFunction, the data is
 reduced and emitted in a pipelined fashion, i.e., once the first record is
 reduced, it is forwarded into the MapFunction (passing the unioned inputs).
 So it is not unexpected that Map starts processing before the
 ReduceFunction terminated.

 Did you enable object reuse [1]?
 If yes, try to disable it. If you want to reuse objects, you have to be
 careful in how you implement your functions.
 If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
 that was generated for the program?

 Thanks,
 Fabian

 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/batch/index.html#operating-on-data-objects-in-functions



 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :

> Any help on this? This thing is very strange..the "manual" union of
> the output of the 2 datasets is different than the flink-union of them..
> Could it be a problem of the flink optimizer?
>
> Best,
> Flavio
>
> On Fri, Mar 16, 2018 at 4:01 PM, simone 
> wrote:
>
>> Sorry, I translated the code into pseudocode too fast. That is indeed
>> an equals.
>>
>> On 16/03/2018 15:58, Kien Truong wrote:
>>
>> Hi,
>>
>> Just a guest, but string compare in Java should be using equals
>> method, not == operator.
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 3/16/2018 9:47 PM, simone wrote:
>>
>> *subject.getField("field1") == "";*
>>
>>
>>
>
>


>>>
>>
>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread simone

Hi Fabian,

This simple code reproduces the behavior -> 
https://github.com/xseris/Flink-test-union


Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:

Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is 
replicated, filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause 
incorrect behavior.


The good thing is, the optimizer seems to be fine. The bad thing is, 
it is either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink 
runtime code.


Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske >:


Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske >:

HI Simone,

Looking at the plan, I don't see why this should be happening.
The pseudo code looks fine as well.
Any chance that you can create a minimal program to reproduce
the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone >:

Hi Fabian,

reuse is not enabled. I attach the plan of the execution.

Thanks,
Simone


On 19/03/2018 11:36, Fabian Hueske wrote:

Hi,

Union is actually a very simple operator (not even an
operator in Flink terms). It just merges to inputs. There
is no additional logic involved.
Therefore, it should also not emit records before either
of both ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the
data is reduced and emitted in a pipelined fashion, i.e.,
once the first record is reduced, it is forwarded into
the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before
the ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects,
you have to be careful in how you implement your functions.
If no, can you share the plan
(ExecutionEnvironment.getExecutionPlan()) that was
generated for the program?

Thanks,
Fabian

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions





2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
>:

Any help on this? This thing is very strange..the
"manual" union of the output of the 2 datasets is
different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone
> wrote:

Sorry, I translated the code into pseudocode too
fast. That is indeed an equals.


On 16/03/2018 15:58, Kien Truong wrote:


Hi,

Just a guest, but string compare in Java should
be using equals method, not == operator.

Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:

/subject.getField("field1") == "";//
/














Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated,
filtered, reduced, and unioned.
There is nothing in between the filter and reduce, that could cause
incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is
either the Flink runtime code or your functions.
Given that one plan produces good results, it might be the Flink runtime
code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske :

> Ah, thanks for the update!
> I'll have a look at that.
>
> 2018-03-19 15:13 GMT+01:00 Fabian Hueske :
>
>> HI Simone,
>>
>> Looking at the plan, I don't see why this should be happening. The pseudo
>> code looks fine as well.
>> Any chance that you can create a minimal program to reproduce the problem?
>>
>> Thanks,
>> Fabian
>>
>> 2018-03-19 12:04 GMT+01:00 simone :
>>
>>> Hi Fabian,
>>>
>>> reuse is not enabled. I attach the plan of the execution.
>>>
>>> Thanks,
>>> Simone
>>>
>>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>>
>>> Hi,
>>>
>>> Union is actually a very simple operator (not even an operator in Flink
>>> terms). It just merges to inputs. There is no additional logic involved.
>>> Therefore, it should also not emit records before either of both
>>> ReduceFunctions sorted its data.
>>> Once the data has been sorted for the ReduceFunction, the data is
>>> reduced and emitted in a pipelined fashion, i.e., once the first record is
>>> reduced, it is forwarded into the MapFunction (passing the unioned inputs).
>>> So it is not unexpected that Map starts processing before the
>>> ReduceFunction terminated.
>>>
>>> Did you enable object reuse [1]?
>>> If yes, try to disable it. If you want to reuse objects, you have to be
>>> careful in how you implement your functions.
>>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>>> that was generated for the program?
>>>
>>> Thanks,
>>> Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/batch/index.html#operating-on-data-objects-in-functions
>>>
>>>
>>>
>>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>>
 Any help on this? This thing is very strange..the "manual" union of the
 output of the 2 datasets is different than the flink-union of them..
 Could it be a problem of the flink optimizer?

 Best,
 Flavio

 On Fri, Mar 16, 2018 at 4:01 PM, simone 
 wrote:

> Sorry, I translated the code into pseudocode too fast. That is indeed
> an equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals
> method, not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>
> *subject.getField("field1") == "";*
>
>
>


>>>
>>>
>>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Ah, thanks for the update!
I'll have a look at that.

2018-03-19 15:13 GMT+01:00 Fabian Hueske :

> HI Simone,
>
> Looking at the plan, I don't see why this should be happening. The pseudo
> code looks fine as well.
> Any chance that you can create a minimal program to reproduce the problem?
>
> Thanks,
> Fabian
>
> 2018-03-19 12:04 GMT+01:00 simone :
>
>> Hi Fabian,
>>
>> reuse is not enabled. I attach the plan of the execution.
>>
>> Thanks,
>> Simone
>>
>> On 19/03/2018 11:36, Fabian Hueske wrote:
>>
>> Hi,
>>
>> Union is actually a very simple operator (not even an operator in Flink
>> terms). It just merges to inputs. There is no additional logic involved.
>> Therefore, it should also not emit records before either of both
>> ReduceFunctions sorted its data.
>> Once the data has been sorted for the ReduceFunction, the data is reduced
>> and emitted in a pipelined fashion, i.e., once the first record is reduced,
>> it is forwarded into the MapFunction (passing the unioned inputs).
>> So it is not unexpected that Map starts processing before the
>> ReduceFunction terminated.
>>
>> Did you enable object reuse [1]?
>> If yes, try to disable it. If you want to reuse objects, you have to be
>> careful in how you implement your functions.
>> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
>> that was generated for the program?
>>
>> Thanks,
>> Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#operating-on-data-objects-in-functions
>>
>>
>>
>> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>>
>>> Any help on this? This thing is very strange..the "manual" union of the
>>> output of the 2 datasets is different than the flink-union of them..
>>> Could it be a problem of the flink optimizer?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>>> wrote:
>>>
 Sorry, I translated the code into pseudocode too fast. That is indeed
 an equals.

 On 16/03/2018 15:58, Kien Truong wrote:

 Hi,

 Just a guest, but string compare in Java should be using equals method,
 not == operator.

 Regards,

 Kien


 On 3/16/2018 9:47 PM, simone wrote:

 *subject.getField("field1") == "";*



>>>
>>>
>>
>>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
HI Simone,

Looking at the plan, I don't see why this should be happening. The pseudo
code looks fine as well.
Any chance that you can create a minimal program to reproduce the problem?

Thanks,
Fabian

2018-03-19 12:04 GMT+01:00 simone :

> Hi Fabian,
>
> reuse is not enabled. I attach the plan of the execution.
>
> Thanks,
> Simone
>
> On 19/03/2018 11:36, Fabian Hueske wrote:
>
> Hi,
>
> Union is actually a very simple operator (not even an operator in Flink
> terms). It just merges to inputs. There is no additional logic involved.
> Therefore, it should also not emit records before either of both
> ReduceFunctions sorted its data.
> Once the data has been sorted for the ReduceFunction, the data is reduced
> and emitted in a pipelined fashion, i.e., once the first record is reduced,
> it is forwarded into the MapFunction (passing the unioned inputs).
> So it is not unexpected that Map starts processing before the
> ReduceFunction terminated.
>
> Did you enable object reuse [1]?
> If yes, try to disable it. If you want to reuse objects, you have to be
> careful in how you implement your functions.
> If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
> that was generated for the program?
>
> Thanks,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/batch/index.html#operating-on-data-objects-in-functions
>
>
>
> 2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :
>
>> Any help on this? This thing is very strange..the "manual" union of the
>> output of the 2 datasets is different than the flink-union of them..
>> Could it be a problem of the flink optimizer?
>>
>> Best,
>> Flavio
>>
>> On Fri, Mar 16, 2018 at 4:01 PM, simone 
>> wrote:
>>
>>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>>> equals.
>>>
>>> On 16/03/2018 15:58, Kien Truong wrote:
>>>
>>> Hi,
>>>
>>> Just a guest, but string compare in Java should be using equals method,
>>> not == operator.
>>>
>>> Regards,
>>>
>>> Kien
>>>
>>>
>>> On 3/16/2018 9:47 PM, simone wrote:
>>>
>>> *subject.getField("field1") == "";*
>>>
>>>
>>>
>>
>>
>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Fabian Hueske
Hi,

Union is actually a very simple operator (not even an operator in Flink
terms). It just merges to inputs. There is no additional logic involved.
Therefore, it should also not emit records before either of both
ReduceFunctions sorted its data.
Once the data has been sorted for the ReduceFunction, the data is reduced
and emitted in a pipelined fashion, i.e., once the first record is reduced,
it is forwarded into the MapFunction (passing the unioned inputs).
So it is not unexpected that Map starts processing before the
ReduceFunction terminated.

Did you enable object reuse [1]?
If yes, try to disable it. If you want to reuse objects, you have to be
careful in how you implement your functions.
If no, can you share the plan (ExecutionEnvironment.getExecutionPlan())
that was generated for the program?

Thanks,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-
release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions



2018-03-19 9:51 GMT+01:00 Flavio Pompermaier :

> Any help on this? This thing is very strange..the "manual" union of the
> output of the 2 datasets is different than the flink-union of them..
> Could it be a problem of the flink optimizer?
>
> Best,
> Flavio
>
> On Fri, Mar 16, 2018 at 4:01 PM, simone 
> wrote:
>
>> Sorry, I translated the code into pseudocode too fast. That is indeed an
>> equals.
>>
>> On 16/03/2018 15:58, Kien Truong wrote:
>>
>> Hi,
>>
>> Just a guest, but string compare in Java should be using equals method,
>> not == operator.
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 3/16/2018 9:47 PM, simone wrote:
>>
>> *subject.getField("field1") == "";*
>>
>>
>>
>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-19 Thread Flavio Pompermaier
Any help on this? This thing is very strange..the "manual" union of the
output of the 2 datasets is different than the flink-union of them..
Could it be a problem of the flink optimizer?

Best,
Flavio

On Fri, Mar 16, 2018 at 4:01 PM, simone  wrote:

> Sorry, I translated the code into pseudocode too fast. That is indeed an
> equals.
>
> On 16/03/2018 15:58, Kien Truong wrote:
>
> Hi,
>
> Just a guest, but string compare in Java should be using equals method,
> not == operator.
>
> Regards,
>
> Kien
>
>
> On 3/16/2018 9:47 PM, simone wrote:
>
> *subject.getField("field1") == "";*
>
>
>


Re: Strange behavior on filter, group and reduce DataSets

2018-03-16 Thread simone
Sorry, I translated the code into pseudocode too fast. That is indeed an 
equals.



On 16/03/2018 15:58, Kien Truong wrote:


Hi,

Just a guest, but string compare in Java should be using equals 
method, not == operator.


Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:

/subject.getField("field1") == "";//
/




Re: Strange behavior on filter, group and reduce DataSets

2018-03-16 Thread Kien Truong

Hi,

Just a guest, but string compare in Java should be using equals method, 
not == operator.


Regards,

Kien


On 3/16/2018 9:47 PM, simone wrote:

/subject.getField("field1") == "";//
/