Hi Reuven,

There is no explicit ID in the message itself, and if there is information
can be used as an ID is depend on use cases.

On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax <re...@google.com> wrote:

> Is there information in the message that can be used as an id, that can be
> used for deduplication?
>
> On Thu, Sep 20, 2018 at 6:36 PM flyisland <fly.isl...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> With the current API we provided, messages cannot be acked from a
>> different client.
>>
>> The server will re-send messages to the reconnected client if those
>> messages were not acked. So there'll be duplicate messages, but with a
>> "redeliver times" property in the header.
>>
>> Thanks for your helpful information, I'll check the UnboundedSources,
>> thanks!
>>
>>
>>
>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Are duplicate messages ok?
>>>
>>> Can you ack messages from a different client or are messages sticky to a
>>> single client (e.g. if one client loses connection, when it reconnects can
>>> it ack messages it received or are those messages automatically replayed)?
>>>
>>> UnboundedSources are the only current "source" type that supports
>>> finalization callbacks[1] that you will need to ack messages and
>>> deduplication[2]. SplittableDoFn will support both of these features but
>>> are not there yet.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>> 2:
>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>>>
>>>
>>> On Wed, Sep 19, 2018 at 8:31 PM flyisland <fly.isl...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> This socket server is like an MQTT server, it has queues inside it and
>>>> the client should ack each message.
>>>>
>>>> > Is receiving and processing each message reliably important or is it
>>>> ok to drop messages when things fail?
>>>> A: Reliable is important, no messages should be lost.
>>>>
>>>> > Is there a message acknowledgement system or can you request a
>>>> position within the message stream (e.g. send all messages from position X
>>>> when connecting and if for whatever reason you need to reconnect you can
>>>> say send messages from position X to replay past messages)?
>>>> A: Client should ack each message it received, and the server will
>>>> delete the acked message. If the client broked and the server do not
>>>> receive an ack, the server will re-send the message to the client while it
>>>> online again. And there is no "position" concept like kafka.
>>>>
>>>>
>>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Before getting into what you could use and the current state of
>>>>> SplittableDoFn and its supported features, I was wondering what 
>>>>> reliability
>>>>> guarantees does the socket server have around messages?
>>>>>
>>>>> Is receiving and processing each message reliably important or is it
>>>>> ok to drop messages when things fail?
>>>>> Is there a message acknowledgement system or can you request a
>>>>> position within the message stream (e.g. send all messages from position X
>>>>> when connecting and if for whatever reason you need to reconnect you can
>>>>> say send messages from position X to replay past messages)?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland <fly.isl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi Gurus,
>>>>>>
>>>>>> I'm trying to create an IO connector to fetch data from a socket
>>>>>> server from Beam, I'm new to Beam, but according to this blog <
>>>>>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
>>>>>> seems that SDF is the recommended way to implement an IO connector now.
>>>>>>
>>>>>> This in-house built socket server could accept multiple clients, but
>>>>>> only send messages to the first-connected client, and will send messages 
>>>>>> to
>>>>>> the second client if the first one disconnected.
>>>>>>
>>>>>> To understand the lifecycle of a DoFn, I've just created a very
>>>>>> simple DoFn subclass, call log.debug() in every method, and according to
>>>>>> the JavaDoc of DoFn.Setup(), "This is a good place to initialize 
>>>>>> transient
>>>>>> in-memory resources, such as network connections. The resources can then 
>>>>>> be
>>>>>> disposed in DoFn.Teardown." I guess I should create the connection to the
>>>>>> socket server in the setup() method.
>>>>>>
>>>>>> But based on the log messages below, even the input PCollection has
>>>>>> only one element, Beam will still create more multiple DemoIO instances 
>>>>>> and
>>>>>> invoked a different DemoIO instance after every checkpoint.
>>>>>>
>>>>>> I'm wondering:
>>>>>> 1. How could I let Beam create only one DemoIO instance, or at least
>>>>>> use the same instance constantly?
>>>>>> 2. Or should I use the Source API for such purpose?
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Logs:
>>>>>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>>>>>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> First->getInitialRestriction() is called!
>>>>>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>>>>>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>>>>>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>>>>>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>>> is
>>>>>> called!
>>>>>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>>>>>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>>>>>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>>>>>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>>>>>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>> 2018-09-18T23:15:56.285Z -> 0 -> First
>>>>>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>>>>>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>> 2018-09-18T23:15:56.786Z -> 1 -> First
>>>>>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>>> is
>>>>>> called!
>>>>>> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>>>>>> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
>>>>>> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
>>>>>> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>> 2018-09-18T23:15:57.354Z -> 2 -> First
>>>>>> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@142109e->setup() is called!
>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>> 2018-09-18T23:15:57.856Z -> 3 -> First
>>>>>> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
>>>>>> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>>>>>> 2018-09-18T23:15:58.358Z -> 4 -> First
>>>>>> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
>>>>>> 9223372036854775807)->newTracker() is called!
>>>>>> 07:15:58:375 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) 
>>>>>> is
>>>>>> called!
>>>>>> 07:15:59:382 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@142109e->process(OffsetRangeTracker{range=[5,
>>>>>> 7), lastClaimedOffset=6, lastAttemptedOffset=7}) end!
>>>>>> 07:15:59:385 [direct-runner-worker] [DEBUG] DemoIO -
>>>>>> org.apache.beam.examples.DemoIO@142109e->finishBundle() is called!
>>>>>>
>>>>>> WindowedWordCountSDF.java
>>>>>>
>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>> List<String> LINES = Arrays.asList("First");
>>>>>> PCollection<String> input =
>>>>>>     pipeline
>>>>>>             .apply(Create.of(LINES))
>>>>>>             .apply(ParDo.of(new DemoIO()));
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> DemoIO.java
>>>>>>
>>>>>> public class DemoIO extends DoFn<String, String> {
>>>>>>     private static final Logger LOG = 
>>>>>> LoggerFactory.getLogger(DemoIO.class);
>>>>>>
>>>>>>     public DemoIO(){
>>>>>>         super();
>>>>>>         LOG.debug("{}->new DemoIO() is called!", this);
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>>>>>         LOG.debug("{}->process({}) is called!", this, tracker);
>>>>>>
>>>>>>         for (long i = tracker.currentRestriction().getFrom(); 
>>>>>> tracker.tryClaim(i); ++i) {
>>>>>>             sleep(500);
>>>>>>             c.outputWithTimestamp(i + " -> " + c.element(), 
>>>>>> Instant.now());
>>>>>>         }
>>>>>>         LOG.debug("{}->process({}) end!", this, tracker);
>>>>>>     }
>>>>>>
>>>>>>     @GetInitialRestriction
>>>>>>     public OffsetRange getInitialRestriction(String input) {
>>>>>>         LOG.debug("{}->getInitialRestriction() is called!", input);
>>>>>>         return new OffsetRange(0, Long.MAX_VALUE);
>>>>>> //        return new OffsetRange(0, 100);
>>>>>>     }
>>>>>>
>>>>>>     @NewTracker
>>>>>>     public OffsetRangeTracker newTracker(OffsetRange range) {
>>>>>>         LOG.debug("{}->newTracker() is called!", range);
>>>>>>         return new OffsetRangeTracker(range);
>>>>>>     }
>>>>>>
>>>>>>     @Setup
>>>>>>     public void setup(){
>>>>>>         LOG.debug("{}->setup() is called!", this);
>>>>>>     }
>>>>>>
>>>>>>     @StartBundle
>>>>>>     public void startBundle(){
>>>>>>         LOG.debug("{}->startBundle() is called!", this);
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to