I don't know if TransformOverrides is the correct way to do this. Will one
of the following options work ?

(1) Update the pipeline to have a "test" mode where you would read from a
file-based source instead of Kafka.
(2) Run a local Kafka instance for the test pipeline instead of using the
instance with production data.

Thanks,
Cham

On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <[email protected]>
wrote:

> We want to support transform override when doing tests locally.  For
> example, in real pipelines, we read from Kafka, but when doing tests
> locally, we want to read from a local file to help test whether the
> pipeline works fine. So we want to override the Kafka read transform
> directly instead of writing the pipeline twice.
>
> code example:
>
> public Pipeline createPipeline(Pipeline pipeline) {
>
>    pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
> }
> In test, we will use the same createPipeline() function to create a
> pipeline but meanwhile we want to override KafkaReadTransform with another
> transform to avoid reading from Kafka.
>
> Thanks,
> Yuhong
>
>
>
>
>
>
>
>
>
> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> In general, TransformOverrides are expected to be per-runner
>> implementation details and are not expected to be directly used by
>> end-users.
>> What is the exact use-case you are trying to achieve ? Are you running
>> into a missing feature of an existing transform ?
>>
>> Thanks,
>> Cham
>>
>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <[email protected]>
>> wrote:
>>
>>> Hi Beam,
>>> We have a use case when creating a pipeline, we want to replace the IO
>>> read/write transform when testing using `pipeline.replaceAll(overrides)`.
>>> However, we met some problems when doing tests:
>>> 1. Are there any ways we can avoid calling expand() of a transform when
>>> it is going to be replaced?  The reason we want to override a transform is
>>> because that the expand() of this transform is somehow not available in
>>> some situations. It seems not reasonable enough to call the expand() of the
>>> originalTransform and then call the expand() of the overrideTransform
>>> again?
>>> 2. When trying to implement `PTransformOverrideFactory`, we realize
>>> that the inputs are `TaggedPValue`, which can only make {Tuple,
>>> PCollection} pairs. Then if we want to override a write transform whose
>>> output type is `PDone`, what's the best way to implement this factory?
>>>
>>> Thanks in advance for answers! This is quite important to our pipelines.
>>>
>>> Thanks,
>>> Yuhong
>>>
>>

Reply via email to