On 5/17/21 3:46 PM, Bashir Sadjad wrote:
Thanks Jan. Two points:

- I was running all the experiments I reported with `--targetParallelism=1` to make sure concurrent threads do not mess up the logs.
I think that is what causes what you see. Try to increase the parallelism to number higher than number of input bundles.
- I have been tracking bundles too (see @StartBundle log messages in the mini-example in my previous reply to Kenn).

I see the code, but not the log output. My suspicion would be, that you see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than this is expected - processing of a bundle produces "output bundle", which is queued into work queue and is then processed as soon as there is free worker to work on it. Fetching new outputs produces new bundles, which are also queued to this queue, which is what causes the interleave.


So I don't think bundles alone describe what I see. In the mini-example, processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT bundles are processed, then the output of those go through NEXT, then a few other INPUT bundles and so on.

Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the input to S2B also has many bundles. However in this case /all/ of those bundles are processed first, then they all go through the next stages, e.g., the logging S2B' that I mentioned. So there is no interleaving of log messages.
GBK is a stateful operation that has to wait for a trigger - in simple batch case the trigger is the end of input, which is why you cannot see outputs of GBK being interleaved with reading inputs. All inputs have had to be read before GBK can proceed and output any bundle downstream.

Regards,

-B

On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Bashir,

    the behavior you describe should be expected. DirectRunner splits
    the input work into bundles, processing each bundle might result
    in zero, one or more new bundles. The executor executes the work
    associated with these bundles, enqueuing new bundles into a queue,
    until there are no unprocessed bundles left in the queue (that is,
    the work has been completely done). It uses a fixed-size thread
    pool to consume and execute work associated with these bundles
    (the size of which is defined by --targetParallelism), so what
    happens is that the processing of bundles of "Sleep" transform and
    "Next" transform are interleaved, but not due to fusion, but due
    to limited parallelism. If you increase the parallelism beyond the
    total number of bundles in your `lines` PCollection, then I think
    you would see the result you expect.

    Best,

     Jan

    On 5/12/21 7:35 PM, Bashir Sadjad wrote:
    Thanks Kenn.

    On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:


        On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad
        <bas...@google.com <mailto:bas...@google.com>> wrote:

            However, if I add a dummy S2' after S2 (i.e.,
            S1->S2->S2'->S3) which only prints some log messages for
            each record and passes the record to output, then it
            seems S2 and S2' are fused. Because the log messages are
            interleaved with fetches.


            *Q1*: Does DirectRunner do any fusion optimization (e.g.,
            like DataflowRunner)? If not by default, is there any way
            to enable it?


        The Java DirectRunner does not do any fusion optimization.
        There's no code to enable :-). It should affect performance
        only, not semantics. The DirectRunner is known to have poor
        performance, but mostly no one is working on speeding it up
        because it is really just for small-scale testing.


    Here is a minimal pipeline (with no windowing) that demonstrates
    what I mean; maybe I am using the wrong terminology but when I
    run this pipeline with DirectRunner (and with
    `--targetParallelism=1`) the `DEBUG INPUT` and `DEBUG NEXT`
    messages are interleaved. While if there was no fusion, I would
    have expected to see all `DEBUG INPUT` messages first and then
    all of `DEBUG NEXT`:

    Pipeline pipeline = Pipeline.create(options);
    PCollection<String> lines =
    pipeline.apply(TextIO.read().from(options.getInputFile()));

    PCollection<String> linesDelayed = lines.apply("Sleep",
    ParDo.of(new DoFn<String, String>() {
      @StartBundle
      public void startBundle() {
    log.info <http://log.info>("INPUT: Started a new bundle");
      }
      @ProcessElement
      public void ProcessElement(@Element String line,
    OutputReceiver<String> out) throws InterruptedException {
    log.info <http://log.info>(String.format("DEBUG INPUT %s", line));
        Thread.sleep(3000);
        out.output(line);
      }
    }));

    PCollection<String> linesDebug = linesDelayed.apply("Next",
    ParDo.of(new DoFn<String, String>() {
      @StartBundle
      public void startBundle() {
    log.info <http://log.info>("NEXT: Started a new bundle");
      }
      @ProcessElement
      public void ProcessElement(@Element String line,
    OutputReceiver<String> out) {
    log.info <http://log.info>(String.format("DEBUG NEXT %s", line));
        out.output(line);
      }
    }));

    
linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));

    PipelineResult result = pipeline.run();
    result.waitUntilFinish();

    It seems that a few bundles are processed by `Sleep` transform
    then they all go through `Next`. Again a few more bundles go
    through `Sleep` then `Next` and so on.

            The other issue is with triggers and creating panes. I
            have an extended version of this pipeline where a
            simplified view of it is: S1->S2A->GBK->S2B->S3

            S1: Like before
            S2A: Add a key to the output of S1
            GBK: Groups output of S2A to remove duplicate keys
            S2B: Similar to S2 above, i.e., fetch deduped URLs and
            create Avro records
            S3: Same as before

            *Q2*: In this case, if I add a dummy S2B' after S2', the
            log messages are /not/ interleaved with resource fetches,
            i.e., no fusion is happening. Why? What is different here?


        I don't quite understand what the problem is here.


    The same log message interleaving does not happen in this case.
    So back to my original example sketch, log messages of S2' are
    interleaved with S2 (which I thought is because of fusion) but
    all of the log messages of S2B' are printed after all messages of
    S2B.

            *Q3*: Even if I add a similar trigger to the output of
            S2B, the Parquet file generation does not start until all
            of the fetches are done. Again, what is different here
            and why intermediate panes are not fired while the output
            of S2B is being generated?


        I think it would help to see how you have configured the
        ParquetIO write transform.


    I think this is related to the difference between the behaviour
    of the two examples above (i.e., S2' vs. S2B'). If it turns out
    that is not the case, I will create a minimal example including
    ParquetIO too.

    Thanks again

    -B


        Kenn


            Thanks

            -B
            P.S. I need this pipeline to work both on a distributed
            runner and also on a local machine with many cores.
            That's why the performance of DirectRunner is important
            to me.

Reply via email to