Thanks Kenn. On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote:
> > On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote: > >> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only >> prints some log messages for each record and passes the record to output, >> then it seems S2 and S2' are fused. Because the log messages are >> interleaved with fetches. >> > >> *Q1*: Does DirectRunner do any fusion optimization (e.g., like >> DataflowRunner)? If not by default, is there any way to enable it? >> > > The Java DirectRunner does not do any fusion optimization. There's no code > to enable :-). It should affect performance only, not semantics. The > DirectRunner is known to have poor performance, but mostly no one is > working on speeding it up because it is really just for small-scale testing. > Here is a minimal pipeline (with no windowing) that demonstrates what I mean; maybe I am using the wrong terminology but when I run this pipeline with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and `DEBUG NEXT` messages are interleaved. While if there was no fusion, I would have expected to see all `DEBUG INPUT` messages first and then all of `DEBUG NEXT`: Pipeline pipeline = Pipeline.create(options); PCollection<String> lines = pipeline.apply(TextIO.read().from(options.getInputFile())); PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new DoFn<String, String>() { @StartBundle public void startBundle() { log.info("INPUT: Started a new bundle"); } @ProcessElement public void ProcessElement(@Element String line, OutputReceiver<String> out) throws InterruptedException { log.info(String.format("DEBUG INPUT %s", line)); Thread.sleep(3000); out.output(line); } })); PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new DoFn<String, String>() { @StartBundle public void startBundle() { log.info("NEXT: Started a new bundle"); } @ProcessElement public void ProcessElement(@Element String line, OutputReceiver<String> out) { log.info(String.format("DEBUG NEXT %s", line)); out.output(line); } })); linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1)); PipelineResult result = pipeline.run(); result.waitUntilFinish(); It seems that a few bundles are processed by `Sleep` transform then they all go through `Next`. Again a few more bundles go through `Sleep` then `Next` and so on. > > > >> The other issue is with triggers and creating panes. I have an extended >> version of this pipeline where a simplified view of it is: >> S1->S2A->GBK->S2B->S3 >> >> S1: Like before >> S2A: Add a key to the output of S1 >> GBK: Groups output of S2A to remove duplicate keys >> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records >> S3: Same as before >> >> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages >> are *not* interleaved with resource fetches, i.e., no fusion is >> happening. Why? What is different here? >> > > I don't quite understand what the problem is here. > The same log message interleaving does not happen in this case. So back to my original example sketch, log messages of S2' are interleaved with S2 (which I thought is because of fusion) but all of the log messages of S2B' are printed after all messages of S2B. > > > >> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet >> file generation does not start until all of the fetches are done. Again, >> what is different here and why intermediate panes are not fired while the >> output of S2B is being generated? >> > > I think it would help to see how you have configured the ParquetIO write > transform. > I think this is related to the difference between the behaviour of the two examples above (i.e., S2' vs. S2B'). If it turns out that is not the case, I will create a minimal example including ParquetIO too. Thanks again -B > > Kenn > > >> >> Thanks >> >> -B >> P.S. I need this pipeline to work both on a distributed runner and also >> on a local machine with many cores. That's why the performance of >> DirectRunner is important to me. >> >