+dev <dev@beam.apache.org>

Hi Yu,
Which runner are you using for your pipeline? Also it would be helpful to
share your pipeline code as well.

On Mon, Jan 25, 2021 at 10:19 PM <yu.b.zh...@oracle.com> wrote:

> Hi Beam Community,
>
> I have a splittable `DoFn` that reads message from some stream and output
> the result to down stream. The pseudo code looks like:
>
> @DoFn.ProcessElement
> public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor 
> sourceDescriptor,
>                                                
> RestrictionTracker<OffsetRange, Long> tracker,
>                                                WatermarkEstimator 
> watermarkEstimator,
>                                                DoFn.OutputReceiver<Record> 
> receiver) throws Exception {
>     while(true){
>         messages = getMessageFromStream();
>         if (messages.isEmpty()) {
>             return DoFn.ProcessContinuation.resume();
>         }
>         for(message: messages){
>             if (!tracker.tryClaim(message)) {
>                 return DoFn.ProcessContinuation.stop();
>             }
>             record = Record(message);
>             receiver.outputWithTimestamp(record, message.getTimestamp);
>         }
>     }
> }
>
>
> I expected to see the output in downstream immediately, but the results
> are grouped into batch (4, 5 output) and emitted to down stream. Is this
> size configurable in `DoFn` or runner?
>
> Thanks for any answer,
> Yu
>
>
>
>

Reply via email to