Hi Gurus,

I'm trying to create an IO connector to fetch data from a socket server
from Beam, I'm new to Beam, but according to this blog <
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
that SDF is the recommended way to implement an IO connector now.

This in-house built socket server could accept multiple clients, but only
send messages to the first-connected client, and will send messages to the
second client if the first one disconnected.

To understand the lifecycle of a DoFn, I've just created a very simple DoFn
subclass, call log.debug() in every method, and according to the JavaDoc of
DoFn.Setup(), "This is a good place to initialize transient in-memory
resources, such as network connections. The resources can then be disposed
in DoFn.Teardown." I guess I should create the connection to the socket
server in the setup() method.

But based on the log messages below, even the input PCollection has only
one element, Beam will still create more multiple DemoIO instances and
invoked a different DemoIO instance after every checkpoint.

I'm wondering:
1. How could I let Beam create only one DemoIO instance, or at least use
the same instance constantly?
2. Or should I use the Source API for such purpose?

Thanks in advance.

Logs:
07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@60a58077->setup() is called!
07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
First->getInitialRestriction() is called!
07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@417eede1->setup() is called!
07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
9223372036854775807)->newTracker() is called!
07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:56.285Z -> 0 -> First
07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:56.786Z -> 1 -> First
07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
9223372036854775807)->newTracker() is called!
07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:57.354Z -> 2 -> First
07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->setup() is called!
07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:57.856Z -> 3 -> First
07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
2018-09-18T23:15:58.358Z -> 4 -> First
07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
9223372036854775807)->newTracker() is called!
07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
called!
07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!

WindowedWordCountSDF.java

Pipeline pipeline = Pipeline.create(options);
List<String> LINES = Arrays.asList("First");
PCollection<String> input =
    pipeline
            .apply(Create.of(LINES))
            .apply(ParDo.of(new DemoIO()));
...


DemoIO.java

public class DemoIO extends DoFn<String, String> {
    private static final Logger LOG = LoggerFactory.getLogger(DemoIO.class);

    public DemoIO(){
        super();
        LOG.debug("{}->new DemoIO() is called!", this);
    }

    @ProcessElement
    public void process(ProcessContext c, OffsetRangeTracker tracker) {
        LOG.debug("{}->process({}) is called!", this, tracker);

        for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
            sleep(500);
            c.outputWithTimestamp(i + " -> " + c.element(), Instant.now());
        }
        LOG.debug("{}->process({}) end!", this, tracker);
    }

    @GetInitialRestriction
    public OffsetRange getInitialRestriction(String input) {
        LOG.debug("{}->getInitialRestriction() is called!", input);
        return new OffsetRange(0, Long.MAX_VALUE);
//        return new OffsetRange(0, 100);
    }

    @NewTracker
    public OffsetRangeTracker newTracker(OffsetRange range) {
        LOG.debug("{}->newTracker() is called!", range);
        return new OffsetRangeTracker(range);
    }

    @Setup
    public void setup(){
        LOG.debug("{}->setup() is called!", this);
    }

    @StartBundle
    public void startBundle(){
        LOG.debug("{}->startBundle() is called!", this);
    }

Reply via email to