Thanks Kenn.

On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote:

>
> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote:
>
>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
>> prints some log messages for each record and passes the record to output,
>> then it seems S2 and S2' are fused. Because the log messages are
>> interleaved with fetches.
>>
>
>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
>> DataflowRunner)? If not by default, is there any way to enable it?
>>
>
> The Java DirectRunner does not do any fusion optimization. There's no code
> to enable :-). It should affect performance only, not semantics. The
> DirectRunner is known to have poor performance, but mostly no one is
> working on speeding it up because it is really just for small-scale testing.
>

Here is a minimal pipeline (with no windowing) that demonstrates what I
mean; maybe I am using the wrong terminology but when I run this pipeline
with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
`DEBUG NEXT` messages are interleaved. While if there was no fusion, I
would have expected to see all `DEBUG INPUT` messages first and then all of
`DEBUG NEXT`:

Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines =
pipeline.apply(TextIO.read().from(options.getInputFile()));

PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new
DoFn<String, String>() {
  @StartBundle
  public void startBundle() {
    log.info("INPUT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver<String>
out) throws InterruptedException {
    log.info(String.format("DEBUG INPUT %s", line));
    Thread.sleep(3000);
    out.output(line);
  }
}));

PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new
DoFn<String, String>() {
  @StartBundle
  public void startBundle() {
    log.info("NEXT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver<String>
out) {
    log.info(String.format("DEBUG NEXT %s", line));
    out.output(line);
  }
}));

linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));

PipelineResult result = pipeline.run();
result.waitUntilFinish();

It seems that a few bundles are processed by `Sleep` transform then they
all go through `Next`. Again a few more bundles go through `Sleep` then
`Next` and so on.


>
>
>
>> The other issue is with triggers and creating panes. I have an extended
>> version of this pipeline where a simplified view of it is:
>> S1->S2A->GBK->S2B->S3
>>
>> S1: Like before
>> S2A: Add a key to the output of S1
>> GBK: Groups output of S2A to remove duplicate keys
>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
>> S3: Same as before
>>
>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages
>> are *not* interleaved with resource fetches, i.e., no fusion is
>> happening. Why? What is different here?
>>
>
> I don't quite understand what the problem is here.
>

The same log message interleaving does not happen in this case. So back to
my original example sketch, log messages of S2' are interleaved with S2
(which I thought is because of fusion) but all of the log messages of S2B'
are printed after all messages of S2B.


>
>
>
>> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
>> file generation does not start until all of the fetches are done. Again,
>> what is different here and why intermediate panes are not fired while the
>> output of S2B is being generated?
>>
>
> I think it would help to see how you have configured the ParquetIO write
> transform.
>

I think this is related to the difference between the behaviour of the two
examples above (i.e., S2' vs. S2B'). If it turns out that is not the case,
I will create a minimal example including ParquetIO too.

Thanks again

-B


>
> Kenn
>
>
>>
>> Thanks
>>
>> -B
>> P.S. I need this pipeline to work both on a distributed runner and also
>> on a local machine with many cores. That's why the performance of
>> DirectRunner is important to me.
>>
>

Reply via email to