Hi Fabian,

This simple code reproduces the behavior -> https://github.com/xseris/Flink-test-union

Thanks, Simone.


On 19/03/2018 15:44, Fabian Hueske wrote:
Hmmm, I still don't see the problem.
IMO, the result should be correct for both plans. The data is replicated, filtered, reduced, and unioned. There is nothing in between the filter and reduce, that could cause incorrect behavior.

The good thing is, the optimizer seems to be fine. The bad thing is, it is either the Flink runtime code or your functions. Given that one plan produces good results, it might be the Flink runtime code.

Coming back to my previous question.
Can you provide a minimal program to reproduce the issue?

Thanks, Fabian

2018-03-19 15:15 GMT+01:00 Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>>:

    Ah, thanks for the update!
    I'll have a look at that.

    2018-03-19 15:13 GMT+01:00 Fabian Hueske <fhue...@gmail.com
    <mailto:fhue...@gmail.com>>:

        HI Simone,

        Looking at the plan, I don't see why this should be happening.
        The pseudo code looks fine as well.
        Any chance that you can create a minimal program to reproduce
        the problem?

        Thanks,
        Fabian

        2018-03-19 12:04 GMT+01:00 simone <simone.povosca...@gmail.com
        <mailto:simone.povosca...@gmail.com>>:

            Hi Fabian,

            reuse is not enabled. I attach the plan of the execution.

            Thanks,
            Simone


            On 19/03/2018 11:36, Fabian Hueske wrote:
            Hi,

            Union is actually a very simple operator (not even an
            operator in Flink terms). It just merges to inputs. There
            is no additional logic involved.
            Therefore, it should also not emit records before either
            of both ReduceFunctions sorted its data.
            Once the data has been sorted for the ReduceFunction, the
            data is reduced and emitted in a pipelined fashion, i.e.,
            once the first record is reduced, it is forwarded into
            the MapFunction (passing the unioned inputs).
            So it is not unexpected that Map starts processing before
            the ReduceFunction terminated.

            Did you enable object reuse [1]?
            If yes, try to disable it. If you want to reuse objects,
            you have to be careful in how you implement your functions.
            If no, can you share the plan
            (ExecutionEnvironment.getExecutionPlan()) that was
            generated for the program?

            Thanks,
            Fabian

            [1]
            
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions
            
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#operating-on-data-objects-in-functions>



            2018-03-19 9:51 GMT+01:00 Flavio Pompermaier
            <pomperma...@okkam.it <mailto:pomperma...@okkam.it>>:

                Any help on this? This thing is very strange..the
                "manual" union of the output of the 2 datasets is
                different than the flink-union of them..
                Could it be a problem of the flink optimizer?

                Best,
                Flavio

                On Fri, Mar 16, 2018 at 4:01 PM, simone
                <simone.povosca...@gmail.com
                <mailto:simone.povosca...@gmail.com>> wrote:

                    Sorry, I translated the code into pseudocode too
                    fast. That is indeed an equals.


                    On 16/03/2018 15:58, Kien Truong wrote:

                    Hi,

                    Just a guest, but string compare in Java should
                    be using equals method, not == operator.

                    Regards,

                    Kien


                    On 3/16/2018 9:47 PM, simone wrote:
                    /subject.getField("field1") == "";//
                    /









Reply via email to