Hi Fabian,

any update on this? Did you fix it?

Best, Simone.


On 22/03/2018 00:24, Fabian Hueske wrote:
Hi,

That was a bit too early.
I found an issue with my approach. Will come back once I solved that.

Best, Fabian

2018-03-21 23:45 GMT+01:00 Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>>:

    Hi,

    I've opened a pull request [1] that should fix the problem.
    It would be great if you could try change and report back whether
    it fixes the problem.

    Thank you,
    Fabian

    [1] https://github.com/apache/flink/pull/5742
    <https://github.com/apache/flink/pull/5742>

    2018-03-21 9:49 GMT+01:00 simone <simone.povosca...@gmail.com
    <mailto:simone.povosca...@gmail.com>>:

        Hi all,

        an update: following Stephan directives on how to diagnose the
        issue, making Person immutable, the problem does not occur.

        Simone.


        On 20/03/2018 20:20, Stephan Ewen wrote:
        To diagnose that, can you please check the following:

          - Change the Person data type to be immutable (final
        fields, no setters, set fields in constructor instead). Does
        that make the problem go away?

          - Change the Person data type to not be a POJO by adding a
        dummy fields that is never used, but does not have a
        getter/setter. Does that make the problem go away?

        If either of that is the case, it must be a mutability bug
        somewhere in either accidental object reuse or accidental
        serializer sharing.


        On Tue, Mar 20, 2018 at 3:34 PM, Fabian Hueske
        <fhue...@gmail.com <mailto:fhue...@gmail.com>> wrote:

            Hi Simone and Flavio,

            I created FLINK-9031 [1] for this issue.
            Please have a look and add any detail that you think
            could help to resolve the problem.

            Thanks,
            Fabian

            [1] https://issues.apache.org/jira/browse/FLINK-9031
            <https://issues.apache.org/jira/browse/FLINK-9031>

            2018-03-19 16:35 GMT+01:00 simone
            <simone.povosca...@gmail.com
            <mailto:simone.povosca...@gmail.com>>:

                Hi Fabian,

                This simple code reproduces the behavior ->
                https://github.com/xseris/Flink-test-union
                <https://github.com/xseris/Flink-test-union>

                Thanks, Simone.


                On 19/03/2018 15:44, Fabian Hueske wrote:
                Hmmm, I still don't see the problem.
                IMO, the result should be correct for both plans.
                The data is replicated, filtered, reduced, and unioned.
                There is nothing in between the filter and reduce,
                that could cause incorrect behavior.

                The good thing is, the optimizer seems to be fine.
                The bad thing is, it is either the Flink runtime
                code or your functions.
                Given that one plan produces good results, it might
                be the Flink runtime code.

                Coming back to my previous question.
                Can you provide a minimal program to reproduce the
                issue?

                Thanks, Fabian

                2018-03-19 15:15 GMT+01:00 Fabian Hueske
                <fhue...@gmail.com <mailto:fhue...@gmail.com>>:

                    Ah, thanks for the update!
                    I'll have a look at that.

                    2018-03-19 15:13 GMT+01:00 Fabian Hueske
                    <fhue...@gmail.com <mailto:fhue...@gmail.com>>:

                        HI Simone,

                        Looking at the plan, I don't see why this
                        should be happening. The pseudo code looks
                        fine as well.
                        Any chance that you can create a minimal
                        program to reproduce the problem?

                        Thanks,
                        Fabian

                        2018-03-19 12:04 GMT+01:00 simone
                        <simone.povosca...@gmail.com
                        <mailto:simone.povosca...@gmail.com>>:

                            Hi Fabian,

                            reuse is not enabled. I attach the plan
                            of the execution.

                            Thanks,
                            Simone


                            On 19/03/2018 11:36, Fabian Hueske wrote:
                            Hi,

                            Union is actually a very simple
                            operator (not even an operator in Flink
                            terms). It just merges to inputs. There
                            is no additional logic involved.
                            Therefore, it should also not emit
                            records before either of both
                            ReduceFunctions sorted its data.
                            Once the data has been sorted for the
                            ReduceFunction, the data is reduced and
                            emitted in a pipelined fashion, i.e.,
                            once the first record is reduced, it is
                            forwarded into the MapFunction (passing
                            the unioned inputs).
                            So it is not unexpected that Map starts
                            processing before the ReduceFunction
                            terminated.

                            Did you enable object reuse [1]?
                            If yes, try to disable it. If you want
                            to reuse objects, you have to be
                            careful in how you implement your
                            functions.
                            If no, can you share the plan
                            (ExecutionEnvironment.getExecutionPlan())
                            that was generated for the program?

                            Thanks,
                            Fabian

                            [1]
                            
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
                            
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>



                            2018-03-19 9:51 GMT+01:00 Flavio
                            Pompermaier <pomperma...@okkam.it
                            <mailto:pomperma...@okkam.it>>:

                                Any help on this? This thing is
                                very strange..the "manual" union of
                                the output of the 2 datasets is
                                different than the flink-union of
                                them..
                                Could it be a problem of the flink
                                optimizer?

                                Best,
                                Flavio

                                On Fri, Mar 16, 2018 at 4:01 PM,
                                simone <simone.povosca...@gmail.com
                                <mailto:simone.povosca...@gmail.com>>
                                wrote:

                                    Sorry, I translated the code
                                    into pseudocode too fast. That
                                    is indeed an equals.


                                    On 16/03/2018 15:58, Kien
                                    Truong wrote:

                                    Hi,

                                    Just a guest, but string
                                    compare in Java should be
                                    using equals method, not ==
                                    operator.

                                    Regards,

                                    Kien


                                    On 3/16/2018 9:47 PM, simone
                                    wrote:
                                    /subject.getField("field1")
                                    == "";//
                                    /















Reply via email to