Hi all,

an update: following Stephan directives on how to diagnose the issue, making Person immutable, the problem does not occur.

Simone.


On 20/03/2018 20:20, Stephan Ewen wrote:
To diagnose that, can you please check the following:

  - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away?

  - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away?

If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing.


On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> wrote:

    Hi Simone and Flavio,

    I created FLINK-9031 [1] for this issue.
    Please have a look and add any detail that you think could help to
    resolve the problem.

    Thanks,
    Fabian

    [1] https://issues.apache.org/jira/browse/FLINK-9031
    <https://issues.apache.org/jira/browse/FLINK-9031>

    2018-03-19 16:35 GMT+01:00 simone <simone.povosca...@gmail.com
    <mailto:simone.povosca...@gmail.com>>:

        Hi Fabian,

        This simple code reproduces the behavior ->
        https://github.com/xseris/Flink-test-union
        <https://github.com/xseris/Flink-test-union>

        Thanks, Simone.


        On 19/03/2018 15:44, Fabian Hueske wrote:
        Hmmm, I still don't see the problem.
        IMO, the result should be correct for both plans. The data is
        replicated, filtered, reduced, and unioned.
        There is nothing in between the filter and reduce, that could
        cause incorrect behavior.

        The good thing is, the optimizer seems to be fine. The bad
        thing is, it is either the Flink runtime code or your functions.
        Given that one plan produces good results, it might be the
        Flink runtime code.

        Coming back to my previous question.
        Can you provide a minimal program to reproduce the issue?

        Thanks, Fabian

        2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com
        <mailto:fhue...@gmail.com>>:

            Ah, thanks for the update!
            I'll have a look at that.

            2018-03-19 15:13 GMT+01:00 Fabian Hueske
            <fhue...@gmail.com <mailto:fhue...@gmail.com>>:

                HI Simone,

                Looking at the plan, I don't see why this should be
                happening. The pseudo code looks fine as well.
                Any chance that you can create a minimal program to
                reproduce the problem?

                Thanks,
                Fabian

                2018-03-19 12:04 GMT+01:00 simone
                <simone.povosca...@gmail.com
                <mailto:simone.povosca...@gmail.com>>:

                    Hi Fabian,

                    reuse is not enabled. I attach the plan of the
                    execution.

                    Thanks,
                    Simone


                    On 19/03/2018 11:36, Fabian Hueske wrote:
                    Hi,

                    Union is actually a very simple operator (not
                    even an operator in Flink terms). It just merges
                    to inputs. There is no additional logic involved.
                    Therefore, it should also not emit records
                    before either of both ReduceFunctions sorted its
                    data.
                    Once the data has been sorted for the
                    ReduceFunction, the data is reduced and emitted
                    in a pipelined fashion, i.e., once the first
                    record is reduced, it is forwarded into the
                    MapFunction (passing the unioned inputs).
                    So it is not unexpected that Map starts
                    processing before the ReduceFunction terminated.

                    Did you enable object reuse [1]?
                    If yes, try to disable it. If you want to reuse
                    objects, you have to be careful in how you
                    implement your functions.
                    If no, can you share the plan
                    (ExecutionEnvironment.getExecutionPlan()) that
                    was generated for the program?

                    Thanks,
                    Fabian

                    [1]
                    
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
                    
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>



                    2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
                    <pomperma...@okkam.it
                    <mailto:pomperma...@okkam.it>>:

                        Any help on this? This thing is very
                        strange..the "manual" union of the output of
                        the 2 datasets is different than the
                        flink-union of them..
                        Could it be a problem of the flink optimizer?

                        Best,
                        Flavio

                        On Fri, Mar 16, 2018 at 4:01 PM, simone
                        <simone.povosca...@gmail.com
                        <mailto:simone.povosca...@gmail.com>> wrote:

                            Sorry, I translated the code into
                            pseudocode too fast. That is indeed an
                            equals.


                            On 16/03/2018 15:58, Kien Truong wrote:

                            Hi,

                            Just a guest, but string compare in
                            Java should be using equals method, not
                            == operator.

                            Regards,

                            Kien


                            On 3/16/2018 9:47 PM, simone wrote:
                            /subject.getField("field1") == "";//
                            /












Reply via email to