It would be simpler to create a custom pipeline option, and swap out the
read transform in your code. For example
PCollection<Type> pc;
if (options.getLocalTest()) {
pc = pipeline.apply(new ReadFromLocalFile());
} else {
pc = pipeline.apply(new KafkaReadTrasnform());
}
pc.apply(/* rest of pipeline */);
On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <[email protected]>
wrote:
> We want to support transform override when doing tests locally. For
> example, in real pipelines, we read from Kafka, but when doing tests
> locally, we want to read from a local file to help test whether the
> pipeline works fine. So we want to override the Kafka read transform
> directly instead of writing the pipeline twice.
>
> code example:
>
> public Pipeline createPipeline(Pipeline pipeline) {
>
> pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
> }
> In test, we will use the same createPipeline() function to create a
> pipeline but meanwhile we want to override KafkaReadTransform with another
> transform to avoid reading from Kafka.
>
> Thanks,
> Yuhong
>
>
>
>
>
>
>
>
>
> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> In general, TransformOverrides are expected to be per-runner
>> implementation details and are not expected to be directly used by
>> end-users.
>> What is the exact use-case you are trying to achieve ? Are you running
>> into a missing feature of an existing transform ?
>>
>> Thanks,
>> Cham
>>
>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <[email protected]>
>> wrote:
>>
>>> Hi Beam,
>>> We have a use case when creating a pipeline, we want to replace the IO
>>> read/write transform when testing using `pipeline.replaceAll(overrides)`.
>>> However, we met some problems when doing tests:
>>> 1. Are there any ways we can avoid calling expand() of a transform when
>>> it is going to be replaced? The reason we want to override a transform is
>>> because that the expand() of this transform is somehow not available in
>>> some situations. It seems not reasonable enough to call the expand() of the
>>> originalTransform and then call the expand() of the overrideTransform
>>> again?
>>> 2. When trying to implement `PTransformOverrideFactory`, we realize
>>> that the inputs are `TaggedPValue`, which can only make {Tuple,
>>> PCollection} pairs. Then if we want to override a write transform whose
>>> output type is `PDone`, what's the best way to implement this factory?
>>>
>>> Thanks in advance for answers! This is quite important to our pipelines.
>>>
>>> Thanks,
>>> Yuhong
>>>
>>