Hi Lukasz,

With the current API we provided, messages cannot be acked from a different
client.

The server will re-send messages to the reconnected client if those
messages were not acked. So there'll be duplicate messages, but with a
"redeliver times" property in the header.

Thanks for your helpful information, I'll check the UnboundedSources,
thanks!



On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <lc...@google.com> wrote:

> Are duplicate messages ok?
>
> Can you ack messages from a different client or are messages sticky to a
> single client (e.g. if one client loses connection, when it reconnects can
> it ack messages it received or are those messages automatically replayed)?
>
> UnboundedSources are the only current "source" type that supports
> finalization callbacks[1] that you will need to ack messages and
> deduplication[2]. SplittableDoFn will support both of these features but
> are not there yet.
>
> 1:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
> 2:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>
>
> On Wed, Sep 19, 2018 at 8:31 PM flyisland <fly.isl...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> This socket server is like an MQTT server, it has queues inside it and
>> the client should ack each message.
>>
>> > Is receiving and processing each message reliably important or is it ok
>> to drop messages when things fail?
>> A: Reliable is important, no messages should be lost.
>>
>> > Is there a message acknowledgement system or can you request a position
>> within the message stream (e.g. send all messages from position X when
>> connecting and if for whatever reason you need to reconnect you can say
>> send messages from position X to replay past messages)?
>> A: Client should ack each message it received, and the server will delete
>> the acked message. If the client broked and the server do not receive an
>> ack, the server will re-send the message to the client while it online
>> again. And there is no "position" concept like kafka.
>>
>>
>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Before getting into what you could use and the current state of
>>> SplittableDoFn and its supported features, I was wondering what reliability
>>> guarantees does the socket server have around messages?
>>>
>>> Is receiving and processing each message reliably important or is it ok
>>> to drop messages when things fail?
>>> Is there a message acknowledgement system or can you request a position
>>> within the message stream (e.g. send all messages from position X when
>>> connecting and if for whatever reason you need to reconnect you can say
>>> send messages from position X to replay past messages)?
>>>
>>>
>>>
>>>
>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <fly.isl...@gmail.com> wrote:
>>>
>>>>
>>>> Hi Gurus,
>>>>
>>>> I'm trying to create an IO connector to fetch data from a socket server
>>>> from Beam, I'm new to Beam, but according to this blog <
>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
>>>> seems that SDF is the recommended way to implement an IO connector now.
>>>>
>>>> This in-house built socket server could accept multiple clients, but
>>>> only send messages to the first-connected client, and will send messages to
>>>> the second client if the first one disconnected.
>>>>
>>>> To understand the lifecycle of a DoFn, I've just created a very simple
>>>> DoFn subclass, call log.debug() in every method, and according to the
>>>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>>>> in-memory resources, such as network connections. The resources can then be
>>>> disposed in DoFn.Teardown." I guess I should create the connection to the
>>>> socket server in the setup() method.
>>>>
>>>> But based on the log messages below, even the input PCollection has
>>>> only one element, Beam will still create more multiple DemoIO instances and
>>>> invoked a different DemoIO instance after every checkpoint.
>>>>
>>>> I'm wondering:
>>>> 1. How could I let Beam create only one DemoIO instance, or at least
>>>> use the same instance constantly?
>>>> 2. Or should I use the Source API for such purpose?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Logs:
>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>>>> First->getInitialRestriction() is called!
>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>>>> 9223372036854775807)->newTracker() is called!
>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>> called!
>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>> 2018-09-18T23:15:56.285Z -> 0 -> First
>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>> 2018-09-18T23:15:56.786Z -> 1 -> First
>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>>>> 9223372036854775807)->newTracker() is called!
>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>> called!
>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>> 2018-09-18T23:15:57.354Z -> 2 -> First
>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>> 2018-09-18T23:15:57.856Z -> 3 -> First
>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>> 2018-09-18T23:15:58.358Z -> 4 -> First
>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>>>> 9223372036854775807)->newTracker() is called!
>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>>>> called!
>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>>>
>>>> WindowedWordCountSDF.java
>>>>
>>>> Pipeline pipeline = Pipeline.create(options);
>>>> List<String> LINES = Arrays.asList("First");
>>>> PCollection<String> input =
>>>>     pipeline
>>>>             .apply(Create.of(LINES))
>>>>             .apply(ParDo.of(new DemoIO()));
>>>> ...
>>>>
>>>>
>>>> DemoIO.java
>>>>
>>>> public class DemoIO extends DoFn<String, String> {
>>>>     private static final Logger LOG = 
>>>> LoggerFactory.getLogger(DemoIO.class);
>>>>
>>>>     public DemoIO(){
>>>>         super();
>>>>         LOG.debug("{}->new DemoIO() is called!", this);
>>>>     }
>>>>
>>>>     @ProcessElement
>>>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>>>
>>>>         for (long i = tracker.currentRestriction().getFrom(); 
>>>> tracker.tryClaim(i); ++i) {
>>>>             sleep(500);
>>>>             c.outputWithTimestamp(i + " -> " + c.element(), Instant.now());
>>>>         }
>>>>         LOG.debug("{}->process({}) end!", this, tracker);
>>>>     }
>>>>
>>>>     @GetInitialRestriction
>>>>     public OffsetRange getInitialRestriction(String input) {
>>>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>>>         return new OffsetRange(0, Long.MAX_VALUE);
>>>> //        return new OffsetRange(0, 100);
>>>>     }
>>>>
>>>>     @NewTracker
>>>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>>>         LOG.debug("{}->newTracker() is called!", range);
>>>>         return new OffsetRangeTracker(range);
>>>>     }
>>>>
>>>>     @Setup
>>>>     public void setup(){
>>>>         LOG.debug("{}->setup() is called!", this);
>>>>     }
>>>>
>>>>     @StartBundle
>>>>     public void startBundle(){
>>>>         LOG.debug("{}->startBundle() is called!", this);
>>>>     }
>>>>
>>>>
>>>>
>>>>

Reply via email to