Hi Boyuan, 

Thanks for replying. We are using beam 2.25.0 and direct runner for testing. We 
are trying to develop an unbounded streaming service connector with splittable 
DoFn. In our connector.read(), we want to commit the message back to stream 
after output the record to downstream user pipeline. The read and user pipeline 
looks like this:
public class Connector {
    public static Connector.Read read() {
        return new AutoValue_Connector_Read.Builder()
                .setStream("")
                .setStreamPartitions(Collections.singletonList(0))
                .build();
    }

    @AutoValue
    public abstract static class Read extends PTransform<PBegin, 
PCollection<Record>> {
        @Override
        public PCollection<Record> expand(PBegin input) {
            PCollection<SourceDescriptor > output = input.getPipeline()
                    .apply(Impulse.create())
                    .apply(ParDo.of(new GenerateSourceDescriptor (this)));

            // then apply the SDF read DoFn on it
            return output.apply(ParDo.of(new ReadDoFn((this))));
        }
    }
}

@DoFn.UnboundedPerElement
class ReadDoFn extends DoFn<SourceDescriptor, Record> {
    @ProcessElement
    public ProcessContinuation processElement(@Element SourceDescriptor  
sourceDescriptor ,
                                              RestrictionTracker<OffsetRange, 
Long> tracker,
                                              OutputReceiver<Record> receiver) {

        while (true) {
            List<Message> messages = getMessageFromStream(cursor);
            if (messages.isEmpty()) {
                return DoFn.ProcessContinuation.resume();
            }
            for (Message message : messages) {
                if (!tracker.tryClaim(message)) {
                    return DoFn.ProcessContinuation.stop();
                }

                Reacord record = Record(message);
                // output to user pipeline
                receiver.outputWithTimestamp(record, Instant.now());

            }
            // commit this batch of messages and get updated cursor to read 
next batch of message
            cursor = commitMessage();

        }
    }
}

//////////////////////////////// pipeline use Connector.read() to read from 
stream /////////////////////////////////////

class UserPipline {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
        p.getOptions().as(StreamingOptions.class).setStreaming(true);

        PCollection<KV<String, String>>
                output =
                p.apply("Read Stream", Connector.read().setStream("stream1"))
                        .apply("Log Record", ParDo.of(new DoFn<Record, 
KV<String, String>>() {
                            @ProcessElement
                            public void processElement(@Element Record input, 
OutputReceiver<KV<String, String>> out) {
                                System.out.printf("[User Pipeline] received 
offset %s : %s : %s \n", input.getOffset(), input.getKV().getKey(), 
input.getKV().getValue());
                                out.output(input.getKV());
                            }
                        }));
    }
}
Since we commit the message after `outputReceiver.output()`, and use the cursor 
in commit response to get next message, if the `outputReceiver.output()` does 
not emit immediately, and buffer message 0, 1, 2, then if user pipeline stops 
and restarts, message 0, 1 are lost as `outputReceiver.output() has not emitted 
them, but messages have been committed in connector.

Is this the expected behavior of `outputReceiver.output()`, if so, how could we 
properly commit the message/ checkpoint in connector so downstream will not 
lost message when starting over. 

Thanks,
Yu
 

> On Jan 26, 2021, at 10:13, Boyuan Zhang <boyu...@google.com> wrote:
> 
> +dev <mailto:d...@beam.apache.org> 
> 
> Hi Yu,
> Which runner are you using for your pipeline? Also it would be helpful to 
> share your pipeline code as well.
> 
> On Mon, Jan 25, 2021 at 10:19 PM <yu.b.zh...@oracle.com 
> <mailto:yu.b.zh...@oracle.com>> wrote:
> Hi Beam Community,
> 
> I have a splittable `DoFn` that reads message from some stream and output the 
> result to down stream. The pseudo code looks like:
> @DoFn.ProcessElement
> public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor 
> sourceDescriptor,
>                                                
> RestrictionTracker<OffsetRange, Long> tracker,
>                                                WatermarkEstimator 
> watermarkEstimator,
>                                                DoFn.OutputReceiver<Record> 
> receiver) throws Exception {
>     while(true){
>         messages = getMessageFromStream();
>         if (messages.isEmpty()) {
>             return DoFn.ProcessContinuation.resume();
>         }
>         for(message: messages){
>             if (!tracker.tryClaim(message)) {
>                 return DoFn.ProcessContinuation.stop();
>             }
>             record = Record(message);
>             receiver.outputWithTimestamp(record, message.getTimestamp);
>         }
>     }
> }
> 
> I expected to see the output in downstream immediately, but the results are 
> grouped into batch (4, 5 output) and emitted to down stream. Is this size 
> configurable in `DoFn` or runner? 
> 
> Thanks for any answer,
> Yu
>  
> 
> 

Reply via email to