+1 to use pipeline options. Alternatively, you can also change your KafkaReadTransform to perform different expansion(override expand()) based on your pipeline options.
On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax <re...@google.com> wrote: > It would be simpler to create a custom pipeline option, and swap out the > read transform in your code. For example > > PCollection<Type> pc; > if (options.getLocalTest()) { > pc = pipeline.apply(new ReadFromLocalFile()); > } else { > pc = pipeline.apply(new KafkaReadTrasnform()); > } > > pc.apply(/* rest of pipeline */); > > On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <mabelyuhong0...@gmail.com> > wrote: > >> We want to support transform override when doing tests locally. For >> example, in real pipelines, we read from Kafka, but when doing tests >> locally, we want to read from a local file to help test whether the >> pipeline works fine. So we want to override the Kafka read transform >> directly instead of writing the pipeline twice. >> >> code example: >> >> public Pipeline createPipeline(Pipeline pipeline) { >> >> pipeline.apply(new KafkaReadTransform()).apply(// other functions..); >> } >> In test, we will use the same createPipeline() function to create a >> pipeline but meanwhile we want to override KafkaReadTransform with another >> transform to avoid reading from Kafka. >> >> Thanks, >> Yuhong >> >> >> >> >> >> >> >> >> >> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> In general, TransformOverrides are expected to be per-runner >>> implementation details and are not expected to be directly used by >>> end-users. >>> What is the exact use-case you are trying to achieve ? Are you running >>> into a missing feature of an existing transform ? >>> >>> Thanks, >>> Cham >>> >>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <mabelyuhong0...@gmail.com> >>> wrote: >>> >>>> Hi Beam, >>>> We have a use case when creating a pipeline, we want to replace the IO >>>> read/write transform when testing using `pipeline.replaceAll(overrides)`. >>>> However, we met some problems when doing tests: >>>> 1. Are there any ways we can avoid calling expand() of a transform when >>>> it is going to be replaced? The reason we want to override a transform is >>>> because that the expand() of this transform is somehow not available in >>>> some situations. It seems not reasonable enough to call the expand() of the >>>> originalTransform and then call the expand() of the overrideTransform >>>> again? >>>> 2. When trying to implement `PTransformOverrideFactory`, we realize >>>> that the inputs are `TaggedPValue`, which can only make {Tuple, >>>> PCollection} pairs. Then if we want to override a write transform whose >>>> output type is `PDone`, what's the best way to implement this factory? >>>> >>>> Thanks in advance for answers! This is quite important to our pipelines. >>>> >>>> Thanks, >>>> Yuhong >>>> >>>