Hi Boyuan,
Thanks for replying. We are using beam 2.25.0 and direct runner for testing. We
are trying to develop an unbounded streaming service connector with splittable
DoFn. In our connector.read(), we want to commit the message back to stream
after output the record to downstream user pipeline. The read and user pipeline
looks like this:
public class Connector {
public static Connector.Read read() {
return new AutoValue_Connector_Read.Builder()
.setStream("")
.setStreamPartitions(Collections.singletonList(0))
.build();
}
@AutoValue
public abstract static class Read extends PTransform<PBegin,
PCollection<Record>> {
@Override
public PCollection<Record> expand(PBegin input) {
PCollection<SourceDescriptor > output = input.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new GenerateSourceDescriptor (this)));
// then apply the SDF read DoFn on it
return output.apply(ParDo.of(new ReadDoFn((this))));
}
}
}
@DoFn.UnboundedPerElement
class ReadDoFn extends DoFn<SourceDescriptor, Record> {
@ProcessElement
public ProcessContinuation processElement(@Element SourceDescriptor
sourceDescriptor ,
RestrictionTracker<OffsetRange,
Long> tracker,
OutputReceiver<Record> receiver) {
while (true) {
List<Message> messages = getMessageFromStream(cursor);
if (messages.isEmpty()) {
return DoFn.ProcessContinuation.resume();
}
for (Message message : messages) {
if (!tracker.tryClaim(message)) {
return DoFn.ProcessContinuation.stop();
}
Reacord record = Record(message);
// output to user pipeline
receiver.outputWithTimestamp(record, Instant.now());
}
// commit this batch of messages and get updated cursor to read
next batch of message
cursor = commitMessage();
}
}
}
//////////////////////////////// pipeline use Connector.read() to read from
stream /////////////////////////////////////
class UserPipline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.getOptions().as(StreamingOptions.class).setStreaming(true);
PCollection<KV<String, String>>
output =
p.apply("Read Stream", Connector.read().setStream("stream1"))
.apply("Log Record", ParDo.of(new DoFn<Record,
KV<String, String>>() {
@ProcessElement
public void processElement(@Element Record input,
OutputReceiver<KV<String, String>> out) {
System.out.printf("[User Pipeline] received
offset %s : %s : %s \n", input.getOffset(), input.getKV().getKey(),
input.getKV().getValue());
out.output(input.getKV());
}
}));
}
}
Since we commit the message after `outputReceiver.output()`, and use the cursor
in commit response to get next message, if the `outputReceiver.output()` does
not emit immediately, and buffer message 0, 1, 2, then if user pipeline stops
and restarts, message 0, 1 are lost as `outputReceiver.output() has not emitted
them, but messages have been committed in connector.
Is this the expected behavior of `outputReceiver.output()`, if so, how could we
properly commit the message/ checkpoint in connector so downstream will not
lost message when starting over.
Thanks,
Yu
> On Jan 26, 2021, at 10:13, Boyuan Zhang <[email protected]> wrote:
>
> +dev <mailto:[email protected]>
>
> Hi Yu,
> Which runner are you using for your pipeline? Also it would be helpful to
> share your pipeline code as well.
>
> On Mon, Jan 25, 2021 at 10:19 PM <[email protected]
> <mailto:[email protected]>> wrote:
> Hi Beam Community,
>
> I have a splittable `DoFn` that reads message from some stream and output the
> result to down stream. The pseudo code looks like:
> @DoFn.ProcessElement
> public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor
> sourceDescriptor,
>
> RestrictionTracker<OffsetRange, Long> tracker,
> WatermarkEstimator
> watermarkEstimator,
> DoFn.OutputReceiver<Record>
> receiver) throws Exception {
> while(true){
> messages = getMessageFromStream();
> if (messages.isEmpty()) {
> return DoFn.ProcessContinuation.resume();
> }
> for(message: messages){
> if (!tracker.tryClaim(message)) {
> return DoFn.ProcessContinuation.stop();
> }
> record = Record(message);
> receiver.outputWithTimestamp(record, message.getTimestamp);
> }
> }
> }
>
> I expected to see the output in downstream immediately, but the results are
> grouped into batch (4, 5 output) and emitted to down stream. Is this size
> configurable in `DoFn` or runner?
>
> Thanks for any answer,
> Yu
>
>
>