+1 to use pipeline options.

 Alternatively, you can also change your KafkaReadTransform to perform
different expansion(override expand()) based on your pipeline options.

On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax <re...@google.com> wrote:

> It would be simpler to create a custom pipeline option, and swap out the
> read transform in your code. For example
>
> PCollection<Type> pc;
> if (options.getLocalTest()) {
>   pc = pipeline.apply(new ReadFromLocalFile());
> } else {
>   pc = pipeline.apply(new KafkaReadTrasnform());
> }
>
> pc.apply(/* rest of pipeline */);
>
> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng <mabelyuhong0...@gmail.com>
> wrote:
>
>> We want to support transform override when doing tests locally.  For
>> example, in real pipelines, we read from Kafka, but when doing tests
>> locally, we want to read from a local file to help test whether the
>> pipeline works fine. So we want to override the Kafka read transform
>> directly instead of writing the pipeline twice.
>>
>> code example:
>>
>> public Pipeline createPipeline(Pipeline pipeline) {
>>
>>    pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
>> }
>> In test, we will use the same createPipeline() function to create a
>> pipeline but meanwhile we want to override KafkaReadTransform with another
>> transform to avoid reading from Kafka.
>>
>> Thanks,
>> Yuhong
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> In general, TransformOverrides are expected to be per-runner
>>> implementation details and are not expected to be directly used by
>>> end-users.
>>> What is the exact use-case you are trying to achieve ? Are you running
>>> into a missing feature of an existing transform ?
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng <mabelyuhong0...@gmail.com>
>>> wrote:
>>>
>>>> Hi Beam,
>>>> We have a use case when creating a pipeline, we want to replace the IO
>>>> read/write transform when testing using `pipeline.replaceAll(overrides)`.
>>>> However, we met some problems when doing tests:
>>>> 1. Are there any ways we can avoid calling expand() of a transform when
>>>> it is going to be replaced?  The reason we want to override a transform is
>>>> because that the expand() of this transform is somehow not available in
>>>> some situations. It seems not reasonable enough to call the expand() of the
>>>> originalTransform and then call the expand() of the overrideTransform
>>>> again?
>>>> 2. When trying to implement `PTransformOverrideFactory`, we realize
>>>> that the inputs are `TaggedPValue`, which can only make {Tuple,
>>>> PCollection} pairs. Then if we want to override a write transform whose
>>>> output type is `PDone`, what's the best way to implement this factory?
>>>>
>>>> Thanks in advance for answers! This is quite important to our pipelines.
>>>>
>>>> Thanks,
>>>> Yuhong
>>>>
>>>

Reply via email to