I don't know if TransformOverrides is the correct way to do this. Will one of the following options work ?
(1) Update the pipeline to have a "test" mode where you would read from a file-based source instead of Kafka. (2) Run a local Kafka instance for the test pipeline instead of using the instance with production data. Thanks, Cham On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <[email protected]> wrote: > We want to support transform override when doing tests locally. For > example, in real pipelines, we read from Kafka, but when doing tests > locally, we want to read from a local file to help test whether the > pipeline works fine. So we want to override the Kafka read transform > directly instead of writing the pipeline twice. > > code example: > > public Pipeline createPipeline(Pipeline pipeline) { > > pipeline.apply(new KafkaReadTransform()).apply(// other functions..); > } > In test, we will use the same createPipeline() function to create a > pipeline but meanwhile we want to override KafkaReadTransform with another > transform to avoid reading from Kafka. > > Thanks, > Yuhong > > > > > > > > > > On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <[email protected]> > wrote: > >> In general, TransformOverrides are expected to be per-runner >> implementation details and are not expected to be directly used by >> end-users. >> What is the exact use-case you are trying to achieve ? Are you running >> into a missing feature of an existing transform ? >> >> Thanks, >> Cham >> >> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <[email protected]> >> wrote: >> >>> Hi Beam, >>> We have a use case when creating a pipeline, we want to replace the IO >>> read/write transform when testing using `pipeline.replaceAll(overrides)`. >>> However, we met some problems when doing tests: >>> 1. Are there any ways we can avoid calling expand() of a transform when >>> it is going to be replaced? The reason we want to override a transform is >>> because that the expand() of this transform is somehow not available in >>> some situations. It seems not reasonable enough to call the expand() of the >>> originalTransform and then call the expand() of the overrideTransform >>> again? >>> 2. When trying to implement `PTransformOverrideFactory`, we realize >>> that the inputs are `TaggedPValue`, which can only make {Tuple, >>> PCollection} pairs. Then if we want to override a write transform whose >>> output type is `PDone`, what's the best way to implement this factory? >>> >>> Thanks in advance for answers! This is quite important to our pipelines. >>> >>> Thanks, >>> Yuhong >>> >>
