Hi Gurus, I'm trying to create an IO connector to fetch data from a socket server from Beam, I'm new to Beam, but according to this blog < https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems that SDF is the recommended way to implement an IO connector now.
This in-house built socket server could accept multiple clients, but only send messages to the first-connected client, and will send messages to the second client if the first one disconnected. To understand the lifecycle of a DoFn, I've just created a very simple DoFn subclass, call log.debug() in every method, and according to the JavaDoc of DoFn.Setup(), "This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.Teardown." I guess I should create the connection to the socket server in the setup() method. But based on the log messages below, even the input PCollection has only one element, Beam will still create more multiple DemoIO instances and invoked a different DemoIO instance after every checkpoint. I'm wondering: 1. How could I let Beam create only one DemoIO instance, or at least use the same instance constantly? 2. Or should I use the Source API for such purpose? Thanks in advance. Logs: 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@60a58077->setup() is called! 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO - First->getInitialRestriction() is called! 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@417eede1->setup() is called! 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@2aa2413a->setup() is called! 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called! 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0, 9223372036854775807)->newTracker() is called! 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is called! 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0, 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end! 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called! 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@30c7fe55->setup() is called! 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - 2018-09-18T23:15:56.285Z -> 0 -> First 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called! 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - 2018-09-18T23:15:56.786Z -> 1 -> First 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2, 9223372036854775807)->newTracker() is called! 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is called! 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2, 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end! 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called! 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - 2018-09-18T23:15:57.354Z -> 2 -> First 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@142109e->setup() is called! 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - 2018-09-18T23:15:57.856Z -> 3 -> First 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@142109e->startBundle() is called! 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF - 2018-09-18T23:15:58.358Z -> 4 -> First 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5, 9223372036854775807)->newTracker() is called! 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is called! 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5, 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end! 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO - org.apache.beam.examples.DemoIO@142109e->finishBundle() is called! WindowedWordCountSDF.java Pipeline pipeline = Pipeline.create(options); List<String> LINES = Arrays.asList("First"); PCollection<String> input = pipeline .apply(Create.of(LINES)) .apply(ParDo.of(new DemoIO())); ... DemoIO.java public class DemoIO extends DoFn<String, String> { private static final Logger LOG = LoggerFactory.getLogger(DemoIO.class); public DemoIO(){ super(); LOG.debug("{}->new DemoIO() is called!", this); } @ProcessElement public void process(ProcessContext c, OffsetRangeTracker tracker) { LOG.debug("{}->process({}) is called!", this, tracker); for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { sleep(500); c.outputWithTimestamp(i + " -> " + c.element(), Instant.now()); } LOG.debug("{}->process({}) end!", this, tracker); } @GetInitialRestriction public OffsetRange getInitialRestriction(String input) { LOG.debug("{}->getInitialRestriction() is called!", input); return new OffsetRange(0, Long.MAX_VALUE); // return new OffsetRange(0, 100); } @NewTracker public OffsetRangeTracker newTracker(OffsetRange range) { LOG.debug("{}->newTracker() is called!", range); return new OffsetRangeTracker(range); } @Setup public void setup(){ LOG.debug("{}->setup() is called!", this); } @StartBundle public void startBundle(){ LOG.debug("{}->startBundle() is called!", this); }