Hi Lukasz,

This socket server is like an MQTT server, it has queues inside it and the
client should ack each message.

> Is receiving and processing each message reliably important or is it ok
to drop messages when things fail?
A: Reliable is important, no messages should be lost.

> Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?
A: Client should ack each message it received, and the server will delete
the acked message. If the client broked and the server do not receive an
ack, the server will re-send the message to the client while it online
again. And there is no "position" concept like kafka.


On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <lc...@google.com> wrote:

> Before getting into what you could use and the current state of
> SplittableDoFn and its supported features, I was wondering what reliability
> guarantees does the socket server have around messages?
>
> Is receiving and processing each message reliably important or is it ok to
> drop messages when things fail?
> Is there a message acknowledgement system or can you request a position
> within the message stream (e.g. send all messages from position X when
> connecting and if for whatever reason you need to reconnect you can say
> send messages from position X to replay past messages)?
>
>
>
>
> On Tue, Sep 18, 2018 at 5:00 PM flyisland <fly.isl...@gmail.com> wrote:
>
>>
>> Hi Gurus,
>>
>> I'm trying to create an IO connector to fetch data from a socket server
>> from Beam, I'm new to Beam, but according to this blog <
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
>> that SDF is the recommended way to implement an IO connector now.
>>
>> This in-house built socket server could accept multiple clients, but only
>> send messages to the first-connected client, and will send messages to the
>> second client if the first one disconnected.
>>
>> To understand the lifecycle of a DoFn, I've just created a very simple
>> DoFn subclass, call log.debug() in every method, and according to the
>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>> in-memory resources, such as network connections. The resources can then be
>> disposed in DoFn.Teardown." I guess I should create the connection to the
>> socket server in the setup() method.
>>
>> But based on the log messages below, even the input PCollection has only
>> one element, Beam will still create more multiple DemoIO instances and
>> invoked a different DemoIO instance after every checkpoint.
>>
>> I'm wondering:
>> 1. How could I let Beam create only one DemoIO instance, or at least use
>> the same instance constantly?
>> 2. Or should I use the Source API for such purpose?
>>
>> Thanks in advance.
>>
>> Logs:
>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>> First->getInitialRestriction() is called!
>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.285Z -> 0 -> First
>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.786Z -> 1 -> First
>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:57.354Z -> 2 -> First
>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:57.856Z -> 3 -> First
>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:58.358Z -> 4 -> First
>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>
>> WindowedWordCountSDF.java
>>
>> Pipeline pipeline = Pipeline.create(options);
>> List<String> LINES = Arrays.asList("First");
>> PCollection<String> input =
>>     pipeline
>>             .apply(Create.of(LINES))
>>             .apply(ParDo.of(new DemoIO()));
>> ...
>>
>>
>> DemoIO.java
>>
>> public class DemoIO extends DoFn<String, String> {
>>     private static final Logger LOG = LoggerFactory.getLogger(DemoIO.class);
>>
>>     public DemoIO(){
>>         super();
>>         LOG.debug("{}->new DemoIO() is called!", this);
>>     }
>>
>>     @ProcessElement
>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>
>>         for (long i = tracker.currentRestriction().getFrom(); 
>> tracker.tryClaim(i); ++i) {
>>             sleep(500);
>>             c.outputWithTimestamp(i + " -> " + c.element(), Instant.now());
>>         }
>>         LOG.debug("{}->process({}) end!", this, tracker);
>>     }
>>
>>     @GetInitialRestriction
>>     public OffsetRange getInitialRestriction(String input) {
>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>         return new OffsetRange(0, Long.MAX_VALUE);
>> //        return new OffsetRange(0, 100);
>>     }
>>
>>     @NewTracker
>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>         LOG.debug("{}->newTracker() is called!", range);
>>         return new OffsetRangeTracker(range);
>>     }
>>
>>     @Setup
>>     public void setup(){
>>         LOG.debug("{}->setup() is called!", this);
>>     }
>>
>>     @StartBundle
>>     public void startBundle(){
>>         LOG.debug("{}->startBundle() is called!", this);
>>     }
>>
>>
>>
>>

Reply via email to