Is there information in the message that can be used as an id, that can be
used for deduplication?

On Thu, Sep 20, 2018 at 6:36 PM flyisland <> wrote:

> Hi Lukasz,
> With the current API we provided, messages cannot be acked from a
> different client.
> The server will re-send messages to the reconnected client if those
> messages were not acked. So there'll be duplicate messages, but with a
> "redeliver times" property in the header.
> Thanks for your helpful information, I'll check the UnboundedSources,
> thanks!
> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <> wrote:
>> Are duplicate messages ok?
>> Can you ack messages from a different client or are messages sticky to a
>> single client (e.g. if one client loses connection, when it reconnects can
>> it ack messages it received or are those messages automatically replayed)?
>> UnboundedSources are the only current "source" type that supports
>> finalization callbacks[1] that you will need to ack messages and
>> deduplication[2]. SplittableDoFn will support both of these features but
>> are not there yet.
>> 1:
>> 2:
>> On Wed, Sep 19, 2018 at 8:31 PM flyisland <> wrote:
>>> Hi Lukasz,
>>> This socket server is like an MQTT server, it has queues inside it and
>>> the client should ack each message.
>>> > Is receiving and processing each message reliably important or is it
>>> ok to drop messages when things fail?
>>> A: Reliable is important, no messages should be lost.
>>> > Is there a message acknowledgement system or can you request a
>>> position within the message stream (e.g. send all messages from position X
>>> when connecting and if for whatever reason you need to reconnect you can
>>> say send messages from position X to replay past messages)?
>>> A: Client should ack each message it received, and the server will
>>> delete the acked message. If the client broked and the server do not
>>> receive an ack, the server will re-send the message to the client while it
>>> online again. And there is no "position" concept like kafka.
>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <> wrote:
>>>> Before getting into what you could use and the current state of
>>>> SplittableDoFn and its supported features, I was wondering what reliability
>>>> guarantees does the socket server have around messages?
>>>> Is receiving and processing each message reliably important or is it ok
>>>> to drop messages when things fail?
>>>> Is there a message acknowledgement system or can you request a position
>>>> within the message stream (e.g. send all messages from position X when
>>>> connecting and if for whatever reason you need to reconnect you can say
>>>> send messages from position X to replay past messages)?
>>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <> wrote:
>>>>> Hi Gurus,
>>>>> I'm trying to create an IO connector to fetch data from a socket
>>>>> server from Beam, I'm new to Beam, but according to this blog <
>>>>>>, it
>>>>> seems that SDF is the recommended way to implement an IO connector now.
>>>>> This in-house built socket server could accept multiple clients, but
>>>>> only send messages to the first-connected client, and will send messages 
>>>>> to
>>>>> the second client if the first one disconnected.
>>>>> To understand the lifecycle of a DoFn, I've just created a very simple
>>>>> DoFn subclass, call log.debug() in every method, and according to the
>>>>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>>>>> in-memory resources, such as network connections. The resources can then 
>>>>> be
>>>>> disposed in DoFn.Teardown." I guess I should create the connection to the
>>>>> socket server in the setup() method.
>>>>> But based on the log messages below, even the input PCollection has
>>>>> only one element, Beam will still create more multiple DemoIO instances 
>>>>> and
>>>>> invoked a different DemoIO instance after every checkpoint.
>>>>> I'm wondering:
>>>>> 1. How could I let Beam create only one DemoIO instance, or at least
>>>>> use the same instance constantly?
>>>>> 2. Or should I use the Source API for such purpose?
>>>>> Thanks in advance.
>>>>> Logs:
>>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> First->getInitialRestriction() is called!
>>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>>>>> 9223372036854775807)->newTracker() is called!
>>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>> is
>>>>> called!
>>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>> 2018-09-18T23:15:56.285Z -> 0 -> First
>>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>> 2018-09-18T23:15:56.786Z -> 1 -> First
>>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>>>>> 9223372036854775807)->newTracker() is called!
>>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>> is
>>>>> called!
>>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>> 2018-09-18T23:15:57.354Z -> 2 -> First
>>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>> 2018-09-18T23:15:57.856Z -> 3 -> First
>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>> 2018-09-18T23:15:58.358Z -> 4 -> First
>>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>>>>> 9223372036854775807)->newTracker() is called!
>>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>> is
>>>>> called!
>>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>> List<String> LINES = Arrays.asList("First");
>>>>> PCollection<String> input =
>>>>>     pipeline
>>>>>             .apply(Create.of(LINES))
>>>>>             .apply(ParDo.of(new DemoIO()));
>>>>> ...
>>>>> public class DemoIO extends DoFn<String, String> {
>>>>>     private static final Logger LOG = 
>>>>> LoggerFactory.getLogger(DemoIO.class);
>>>>>     public DemoIO(){
>>>>>         super();
>>>>>         LOG.debug("{}->new DemoIO() is called!", this);
>>>>>     }
>>>>>     @ProcessElement
>>>>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>>>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>>>>         for (long i = tracker.currentRestriction().getFrom(); 
>>>>> tracker.tryClaim(i); ++i) {
>>>>>             sleep(500);
>>>>>             c.outputWithTimestamp(i + " -> " + c.element(), 
>>>>>         }
>>>>>         LOG.debug("{}->process({}) end!", this, tracker);
>>>>>     }
>>>>>     @GetInitialRestriction
>>>>>     public OffsetRange getInitialRestriction(String input) {
>>>>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>>>>         return new OffsetRange(0, Long.MAX_VALUE);
>>>>> //        return new OffsetRange(0, 100);
>>>>>     }
>>>>>     @NewTracker
>>>>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>>>>         LOG.debug("{}->newTracker() is called!", range);
>>>>>         return new OffsetRangeTracker(range);
>>>>>     }
>>>>>     @Setup
>>>>>     public void setup(){
>>>>>         LOG.debug("{}->setup() is called!", this);
>>>>>     }
>>>>>     @StartBundle
>>>>>     public void startBundle(){
>>>>>         LOG.debug("{}->startBundle() is called!", this);
>>>>>     }

Reply via email to