Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-10-03 Thread flyisland
Hi Raghu,

> Assuming you need to ack on the same connection that served the records,
finalize() functionality in UnboundedSource API is important case. You can
use UnboundeSource API for now.

I have got a new question now, where should I keep the connection for later
ack action?

The MqttIO/JmsIO all acked messages in
the UnboundedSource.CheckpointMark.finalizeCheckpoint() method, but I found
in the javadoc it said:

>  It is NOT safe to assume the UnboundedSource.UnboundedReader from which
this checkpoint was created still exists at the time this method is called.

I do encounter this situation in my testing with the Direct Runner, the
"msg.ack()" method failed when the finalizeCheckpoint() method is called
since the related reader has already been closed!

Is there any way to ask the runner to call finalizeCheckpoint() method
before it closed the Reader?


On Sat, Sep 22, 2018 at 7:01 AM Raghu Angadi  wrote:

> > This in-house built socket server could accept multiple clients, but
> only send messages to the first-connected client, and will send messages to
> the second client if the first one disconnected.
>
> Server sending messages to first client connection only is quite critical.
> Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
> in your case. Note that is says it reuses, but that does not guarantee
> single DoFn instance or when it actually calls TearDown(). It is on
> best-effort basis. The work could move to a different worker and the DoFn
> instance on earlier worker can live for a long time. So if you held the
> connection to server until TearDown() is called, you could be inadvertently
> blocking reads from DoFn on the new worker. If you want to keep the
> connection open across bundles, you need some way to close an idle
> connection asynchronously (alternately your service might have timeout to
> close an idle client connection, which is much better). Since you can't
> afford to wait till TearDown(), you might as well have a singleton
> connection that gets closed after some idle time.
>
> Assuming you need to ack on the same connection that served the records,
> finalize() functionality in UnboundedSource API is important case. You can
> use UnboundeSource API for now.
>
> On Thu, Sep 20, 2018 at 8:25 PM flyisland  wrote:
>
>> Hi Reuven,
>>
>> There is no explicit ID in the message itself, and if there is
>> information can be used as an ID is depend on use cases.
>>
>> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax  wrote:
>>
>>> Is there information in the message that can be used as an id, that can
>>> be used for deduplication?
>>>
>>> On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:
>>>
 Hi Lukasz,

 With the current API we provided, messages cannot be acked from a
 different client.

 The server will re-send messages to the reconnected client if those
 messages were not acked. So there'll be duplicate messages, but with a
 "redeliver times" property in the header.

 Thanks for your helpful information, I'll check the UnboundedSources,
 thanks!



 On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:

> Are duplicate messages ok?
>
> Can you ack messages from a different client or are messages sticky to
> a single client (e.g. if one client loses connection, when it reconnects
> can it ack messages it received or are those messages automatically
> replayed)?
>
> UnboundedSources are the only current "source" type that supports
> finalization callbacks[1] that you will need to ack messages and
> deduplication[2]. SplittableDoFn will support both of these features but
> are not there yet.
>
> 1:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
> 2:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>
>
> On Wed, Sep 19, 2018 at 8:31 PM flyisland 
> wrote:
>
>> Hi Lukasz,
>>
>> This socket server is like an MQTT server, it has queues inside it
>> and the client should ack each message.
>>
>> > Is receiving and processing each message reliably important or is
>> it ok to drop messages when things fail?
>> A: Reliable is important, no messages should be lost.
>>
>> > Is there a message acknowledgement system or can you request a
>> position within the message stream (e.g. send all messages from position 
>> X
>> when connecting and if for whatever reason you need to reconnect you can
>> say send messages from position X to replay past messages)?
>> A: Client should ack each message it received, and the server will
>> delete the acked message. If the client broked and the server do not
>> receive an ack, the server will re-send the message to 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-22 Thread flyisland
Hi Raghu,

Thanks very much! Yes, I think I should focus on the UnboundeSource API for
now.

On Sat, Sep 22, 2018 at 7:01 AM Raghu Angadi  wrote:

> > This in-house built socket server could accept multiple clients, but
> only send messages to the first-connected client, and will send messages to
> the second client if the first one disconnected.
>
> Server sending messages to first client connection only is quite critical.
> Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
> in your case. Note that is says it reuses, but that does not guarantee
> single DoFn instance or when it actually calls TearDown(). It is on
> best-effort basis. The work could move to a different worker and the DoFn
> instance on earlier worker can live for a long time. So if you held the
> connection to server until TearDown() is called, you could be inadvertently
> blocking reads from DoFn on the new worker. If you want to keep the
> connection open across bundles, you need some way to close an idle
> connection asynchronously (alternately your service might have timeout to
> close an idle client connection, which is much better). Since you can't
> afford to wait till TearDown(), you might as well have a singleton
> connection that gets closed after some idle time.
>
> Assuming you need to ack on the same connection that served the records,
> finalize() functionality in UnboundedSource API is important case. You can
> use UnboundeSource API for now.
>
> On Thu, Sep 20, 2018 at 8:25 PM flyisland  wrote:
>
>> Hi Reuven,
>>
>> There is no explicit ID in the message itself, and if there is
>> information can be used as an ID is depend on use cases.
>>
>> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax  wrote:
>>
>>> Is there information in the message that can be used as an id, that can
>>> be used for deduplication?
>>>
>>> On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:
>>>
 Hi Lukasz,

 With the current API we provided, messages cannot be acked from a
 different client.

 The server will re-send messages to the reconnected client if those
 messages were not acked. So there'll be duplicate messages, but with a
 "redeliver times" property in the header.

 Thanks for your helpful information, I'll check the UnboundedSources,
 thanks!



 On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:

> Are duplicate messages ok?
>
> Can you ack messages from a different client or are messages sticky to
> a single client (e.g. if one client loses connection, when it reconnects
> can it ack messages it received or are those messages automatically
> replayed)?
>
> UnboundedSources are the only current "source" type that supports
> finalization callbacks[1] that you will need to ack messages and
> deduplication[2]. SplittableDoFn will support both of these features but
> are not there yet.
>
> 1:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
> 2:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>
>
> On Wed, Sep 19, 2018 at 8:31 PM flyisland 
> wrote:
>
>> Hi Lukasz,
>>
>> This socket server is like an MQTT server, it has queues inside it
>> and the client should ack each message.
>>
>> > Is receiving and processing each message reliably important or is
>> it ok to drop messages when things fail?
>> A: Reliable is important, no messages should be lost.
>>
>> > Is there a message acknowledgement system or can you request a
>> position within the message stream (e.g. send all messages from position 
>> X
>> when connecting and if for whatever reason you need to reconnect you can
>> say send messages from position X to replay past messages)?
>> A: Client should ack each message it received, and the server will
>> delete the acked message. If the client broked and the server do not
>> receive an ack, the server will re-send the message to the client while 
>> it
>> online again. And there is no "position" concept like kafka.
>>
>>
>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:
>>
>>> Before getting into what you could use and the current state of
>>> SplittableDoFn and its supported features, I was wondering what 
>>> reliability
>>> guarantees does the socket server have around messages?
>>>
>>> Is receiving and processing each message reliably important or is it
>>> ok to drop messages when things fail?
>>> Is there a message acknowledgement system or can you request a
>>> position within the message stream (e.g. send all messages from 
>>> position X
>>> when connecting and if for whatever reason you need to reconnect you can

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-21 Thread Raghu Angadi
> This in-house built socket server could accept multiple clients, but only
send messages to the first-connected client, and will send messages to the
second client if the first one disconnected.

Server sending messages to first client connection only is quite critical.
Even if you use Source API which honors 'Setup()' JavaDoc, it is not enough
in your case. Note that is says it reuses, but that does not guarantee
single DoFn instance or when it actually calls TearDown(). It is on
best-effort basis. The work could move to a different worker and the DoFn
instance on earlier worker can live for a long time. So if you held the
connection to server until TearDown() is called, you could be inadvertently
blocking reads from DoFn on the new worker. If you want to keep the
connection open across bundles, you need some way to close an idle
connection asynchronously (alternately your service might have timeout to
close an idle client connection, which is much better). Since you can't
afford to wait till TearDown(), you might as well have a singleton
connection that gets closed after some idle time.

Assuming you need to ack on the same connection that served the records,
finalize() functionality in UnboundedSource API is important case. You can
use UnboundeSource API for now.

On Thu, Sep 20, 2018 at 8:25 PM flyisland  wrote:

> Hi Reuven,
>
> There is no explicit ID in the message itself, and if there is
> information can be used as an ID is depend on use cases.
>
> On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax  wrote:
>
>> Is there information in the message that can be used as an id, that can
>> be used for deduplication?
>>
>> On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:
>>
>>> Hi Lukasz,
>>>
>>> With the current API we provided, messages cannot be acked from a
>>> different client.
>>>
>>> The server will re-send messages to the reconnected client if those
>>> messages were not acked. So there'll be duplicate messages, but with a
>>> "redeliver times" property in the header.
>>>
>>> Thanks for your helpful information, I'll check the UnboundedSources,
>>> thanks!
>>>
>>>
>>>
>>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:
>>>
 Are duplicate messages ok?

 Can you ack messages from a different client or are messages sticky to
 a single client (e.g. if one client loses connection, when it reconnects
 can it ack messages it received or are those messages automatically
 replayed)?

 UnboundedSources are the only current "source" type that supports
 finalization callbacks[1] that you will need to ack messages and
 deduplication[2]. SplittableDoFn will support both of these features but
 are not there yet.

 1:
 https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
 2:
 https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93


 On Wed, Sep 19, 2018 at 8:31 PM flyisland  wrote:

> Hi Lukasz,
>
> This socket server is like an MQTT server, it has queues inside it and
> the client should ack each message.
>
> > Is receiving and processing each message reliably important or is it
> ok to drop messages when things fail?
> A: Reliable is important, no messages should be lost.
>
> > Is there a message acknowledgement system or can you request a
> position within the message stream (e.g. send all messages from position X
> when connecting and if for whatever reason you need to reconnect you can
> say send messages from position X to replay past messages)?
> A: Client should ack each message it received, and the server will
> delete the acked message. If the client broked and the server do not
> receive an ack, the server will re-send the message to the client while it
> online again. And there is no "position" concept like kafka.
>
>
> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:
>
>> Before getting into what you could use and the current state of
>> SplittableDoFn and its supported features, I was wondering what 
>> reliability
>> guarantees does the socket server have around messages?
>>
>> Is receiving and processing each message reliably important or is it
>> ok to drop messages when things fail?
>> Is there a message acknowledgement system or can you request a
>> position within the message stream (e.g. send all messages from position 
>> X
>> when connecting and if for whatever reason you need to reconnect you can
>> say send messages from position X to replay past messages)?
>>
>>
>>
>>
>> On Tue, Sep 18, 2018 at 5:00 PM flyisland 
>> wrote:
>>
>>>
>>> Hi Gurus,
>>>
>>> I'm trying to create an IO connector to fetch data from a socket
>>> server from 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-20 Thread flyisland
Hi Reuven,

There is no explicit ID in the message itself, and if there is information
can be used as an ID is depend on use cases.

On Fri, Sep 21, 2018 at 11:05 AM Reuven Lax  wrote:

> Is there information in the message that can be used as an id, that can be
> used for deduplication?
>
> On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:
>
>> Hi Lukasz,
>>
>> With the current API we provided, messages cannot be acked from a
>> different client.
>>
>> The server will re-send messages to the reconnected client if those
>> messages were not acked. So there'll be duplicate messages, but with a
>> "redeliver times" property in the header.
>>
>> Thanks for your helpful information, I'll check the UnboundedSources,
>> thanks!
>>
>>
>>
>> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:
>>
>>> Are duplicate messages ok?
>>>
>>> Can you ack messages from a different client or are messages sticky to a
>>> single client (e.g. if one client loses connection, when it reconnects can
>>> it ack messages it received or are those messages automatically replayed)?
>>>
>>> UnboundedSources are the only current "source" type that supports
>>> finalization callbacks[1] that you will need to ack messages and
>>> deduplication[2]. SplittableDoFn will support both of these features but
>>> are not there yet.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>> 2:
>>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>>>
>>>
>>> On Wed, Sep 19, 2018 at 8:31 PM flyisland  wrote:
>>>
 Hi Lukasz,

 This socket server is like an MQTT server, it has queues inside it and
 the client should ack each message.

 > Is receiving and processing each message reliably important or is it
 ok to drop messages when things fail?
 A: Reliable is important, no messages should be lost.

 > Is there a message acknowledgement system or can you request a
 position within the message stream (e.g. send all messages from position X
 when connecting and if for whatever reason you need to reconnect you can
 say send messages from position X to replay past messages)?
 A: Client should ack each message it received, and the server will
 delete the acked message. If the client broked and the server do not
 receive an ack, the server will re-send the message to the client while it
 online again. And there is no "position" concept like kafka.


 On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:

> Before getting into what you could use and the current state of
> SplittableDoFn and its supported features, I was wondering what 
> reliability
> guarantees does the socket server have around messages?
>
> Is receiving and processing each message reliably important or is it
> ok to drop messages when things fail?
> Is there a message acknowledgement system or can you request a
> position within the message stream (e.g. send all messages from position X
> when connecting and if for whatever reason you need to reconnect you can
> say send messages from position X to replay past messages)?
>
>
>
>
> On Tue, Sep 18, 2018 at 5:00 PM flyisland 
> wrote:
>
>>
>> Hi Gurus,
>>
>> I'm trying to create an IO connector to fetch data from a socket
>> server from Beam, I'm new to Beam, but according to this blog <
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
>> seems that SDF is the recommended way to implement an IO connector now.
>>
>> This in-house built socket server could accept multiple clients, but
>> only send messages to the first-connected client, and will send messages 
>> to
>> the second client if the first one disconnected.
>>
>> To understand the lifecycle of a DoFn, I've just created a very
>> simple DoFn subclass, call log.debug() in every method, and according to
>> the JavaDoc of DoFn.Setup(), "This is a good place to initialize 
>> transient
>> in-memory resources, such as network connections. The resources can then 
>> be
>> disposed in DoFn.Teardown." I guess I should create the connection to the
>> socket server in the setup() method.
>>
>> But based on the log messages below, even the input PCollection has
>> only one element, Beam will still create more multiple DemoIO instances 
>> and
>> invoked a different DemoIO instance after every checkpoint.
>>
>> I'm wondering:
>> 1. How could I let Beam create only one DemoIO instance, or at least
>> use the same instance constantly?
>> 2. Or should I use the Source API for such purpose?
>>
>> Thanks in advance.
>>
>> Logs:
>> 07:15:55:586 [direct-runner-worker] [DEBUG] 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-20 Thread Reuven Lax
Is there information in the message that can be used as an id, that can be
used for deduplication?

On Thu, Sep 20, 2018 at 6:36 PM flyisland  wrote:

> Hi Lukasz,
>
> With the current API we provided, messages cannot be acked from a
> different client.
>
> The server will re-send messages to the reconnected client if those
> messages were not acked. So there'll be duplicate messages, but with a
> "redeliver times" property in the header.
>
> Thanks for your helpful information, I'll check the UnboundedSources,
> thanks!
>
>
>
> On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:
>
>> Are duplicate messages ok?
>>
>> Can you ack messages from a different client or are messages sticky to a
>> single client (e.g. if one client loses connection, when it reconnects can
>> it ack messages it received or are those messages automatically replayed)?
>>
>> UnboundedSources are the only current "source" type that supports
>> finalization callbacks[1] that you will need to ack messages and
>> deduplication[2]. SplittableDoFn will support both of these features but
>> are not there yet.
>>
>> 1:
>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>> 2:
>> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>>
>>
>> On Wed, Sep 19, 2018 at 8:31 PM flyisland  wrote:
>>
>>> Hi Lukasz,
>>>
>>> This socket server is like an MQTT server, it has queues inside it and
>>> the client should ack each message.
>>>
>>> > Is receiving and processing each message reliably important or is it
>>> ok to drop messages when things fail?
>>> A: Reliable is important, no messages should be lost.
>>>
>>> > Is there a message acknowledgement system or can you request a
>>> position within the message stream (e.g. send all messages from position X
>>> when connecting and if for whatever reason you need to reconnect you can
>>> say send messages from position X to replay past messages)?
>>> A: Client should ack each message it received, and the server will
>>> delete the acked message. If the client broked and the server do not
>>> receive an ack, the server will re-send the message to the client while it
>>> online again. And there is no "position" concept like kafka.
>>>
>>>
>>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:
>>>
 Before getting into what you could use and the current state of
 SplittableDoFn and its supported features, I was wondering what reliability
 guarantees does the socket server have around messages?

 Is receiving and processing each message reliably important or is it ok
 to drop messages when things fail?
 Is there a message acknowledgement system or can you request a position
 within the message stream (e.g. send all messages from position X when
 connecting and if for whatever reason you need to reconnect you can say
 send messages from position X to replay past messages)?




 On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:

>
> Hi Gurus,
>
> I'm trying to create an IO connector to fetch data from a socket
> server from Beam, I'm new to Beam, but according to this blog <
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
> seems that SDF is the recommended way to implement an IO connector now.
>
> This in-house built socket server could accept multiple clients, but
> only send messages to the first-connected client, and will send messages 
> to
> the second client if the first one disconnected.
>
> To understand the lifecycle of a DoFn, I've just created a very simple
> DoFn subclass, call log.debug() in every method, and according to the
> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
> in-memory resources, such as network connections. The resources can then 
> be
> disposed in DoFn.Teardown." I guess I should create the connection to the
> socket server in the setup() method.
>
> But based on the log messages below, even the input PCollection has
> only one element, Beam will still create more multiple DemoIO instances 
> and
> invoked a different DemoIO instance after every checkpoint.
>
> I'm wondering:
> 1. How could I let Beam create only one DemoIO instance, or at least
> use the same instance constantly?
> 2. Or should I use the Source API for such purpose?
>
> Thanks in advance.
>
> Logs:
> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
> First->getInitialRestriction() is called!
> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
> 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-20 Thread flyisland
Hi Lukasz,

With the current API we provided, messages cannot be acked from a different
client.

The server will re-send messages to the reconnected client if those
messages were not acked. So there'll be duplicate messages, but with a
"redeliver times" property in the header.

Thanks for your helpful information, I'll check the UnboundedSources,
thanks!



On Fri, Sep 21, 2018 at 2:09 AM Lukasz Cwik  wrote:

> Are duplicate messages ok?
>
> Can you ack messages from a different client or are messages sticky to a
> single client (e.g. if one client loses connection, when it reconnects can
> it ack messages it received or are those messages automatically replayed)?
>
> UnboundedSources are the only current "source" type that supports
> finalization callbacks[1] that you will need to ack messages and
> deduplication[2]. SplittableDoFn will support both of these features but
> are not there yet.
>
> 1:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
> 2:
> https://github.com/apache/beam/blob/256fcdfe3ab0f6827195a262932e1267c4d4bfba/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L93
>
>
> On Wed, Sep 19, 2018 at 8:31 PM flyisland  wrote:
>
>> Hi Lukasz,
>>
>> This socket server is like an MQTT server, it has queues inside it and
>> the client should ack each message.
>>
>> > Is receiving and processing each message reliably important or is it ok
>> to drop messages when things fail?
>> A: Reliable is important, no messages should be lost.
>>
>> > Is there a message acknowledgement system or can you request a position
>> within the message stream (e.g. send all messages from position X when
>> connecting and if for whatever reason you need to reconnect you can say
>> send messages from position X to replay past messages)?
>> A: Client should ack each message it received, and the server will delete
>> the acked message. If the client broked and the server do not receive an
>> ack, the server will re-send the message to the client while it online
>> again. And there is no "position" concept like kafka.
>>
>>
>> On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:
>>
>>> Before getting into what you could use and the current state of
>>> SplittableDoFn and its supported features, I was wondering what reliability
>>> guarantees does the socket server have around messages?
>>>
>>> Is receiving and processing each message reliably important or is it ok
>>> to drop messages when things fail?
>>> Is there a message acknowledgement system or can you request a position
>>> within the message stream (e.g. send all messages from position X when
>>> connecting and if for whatever reason you need to reconnect you can say
>>> send messages from position X to replay past messages)?
>>>
>>>
>>>
>>>
>>> On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:
>>>

 Hi Gurus,

 I'm trying to create an IO connector to fetch data from a socket server
 from Beam, I'm new to Beam, but according to this blog <
 https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it
 seems that SDF is the recommended way to implement an IO connector now.

 This in-house built socket server could accept multiple clients, but
 only send messages to the first-connected client, and will send messages to
 the second client if the first one disconnected.

 To understand the lifecycle of a DoFn, I've just created a very simple
 DoFn subclass, call log.debug() in every method, and according to the
 JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
 in-memory resources, such as network connections. The resources can then be
 disposed in DoFn.Teardown." I guess I should create the connection to the
 socket server in the setup() method.

 But based on the log messages below, even the input PCollection has
 only one element, Beam will still create more multiple DemoIO instances and
 invoked a different DemoIO instance after every checkpoint.

 I'm wondering:
 1. How could I let Beam create only one DemoIO instance, or at least
 use the same instance constantly?
 2. Or should I use the Source API for such purpose?

 Thanks in advance.

 Logs:
 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
 org.apache.beam.examples.DemoIO@60a58077->setup() is called!
 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
 First->getInitialRestriction() is called!
 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
 org.apache.beam.examples.DemoIO@417eede1->setup() is called!
 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
 org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
 org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
 07:15:55:775 [direct-runner-worker] [DEBUG] 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-19 Thread flyisland
Hi Lukasz,

This socket server is like an MQTT server, it has queues inside it and the
client should ack each message.

> Is receiving and processing each message reliably important or is it ok
to drop messages when things fail?
A: Reliable is important, no messages should be lost.

> Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?
A: Client should ack each message it received, and the server will delete
the acked message. If the client broked and the server do not receive an
ack, the server will re-send the message to the client while it online
again. And there is no "position" concept like kafka.


On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:

> Before getting into what you could use and the current state of
> SplittableDoFn and its supported features, I was wondering what reliability
> guarantees does the socket server have around messages?
>
> Is receiving and processing each message reliably important or is it ok to
> drop messages when things fail?
> Is there a message acknowledgement system or can you request a position
> within the message stream (e.g. send all messages from position X when
> connecting and if for whatever reason you need to reconnect you can say
> send messages from position X to replay past messages)?
>
>
>
>
> On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:
>
>>
>> Hi Gurus,
>>
>> I'm trying to create an IO connector to fetch data from a socket server
>> from Beam, I'm new to Beam, but according to this blog <
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
>> that SDF is the recommended way to implement an IO connector now.
>>
>> This in-house built socket server could accept multiple clients, but only
>> send messages to the first-connected client, and will send messages to the
>> second client if the first one disconnected.
>>
>> To understand the lifecycle of a DoFn, I've just created a very simple
>> DoFn subclass, call log.debug() in every method, and according to the
>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>> in-memory resources, such as network connections. The resources can then be
>> disposed in DoFn.Teardown." I guess I should create the connection to the
>> socket server in the setup() method.
>>
>> But based on the log messages below, even the input PCollection has only
>> one element, Beam will still create more multiple DemoIO instances and
>> invoked a different DemoIO instance after every checkpoint.
>>
>> I'm wondering:
>> 1. How could I let Beam create only one DemoIO instance, or at least use
>> the same instance constantly?
>> 2. Or should I use the Source API for such purpose?
>>
>> Thanks in advance.
>>
>> Logs:
>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>> First->getInitialRestriction() is called!
>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.285Z -> 0 -> First
>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.786Z -> 1 -> First
>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:58:358 [direct-runner-worker] 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-19 Thread Lukasz Cwik
Before getting into what you could use and the current state of
SplittableDoFn and its supported features, I was wondering what reliability
guarantees does the socket server have around messages?

Is receiving and processing each message reliably important or is it ok to
drop messages when things fail?
Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?




On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:

>
> Hi Gurus,
>
> I'm trying to create an IO connector to fetch data from a socket server
> from Beam, I'm new to Beam, but according to this blog <
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
> that SDF is the recommended way to implement an IO connector now.
>
> This in-house built socket server could accept multiple clients, but only
> send messages to the first-connected client, and will send messages to the
> second client if the first one disconnected.
>
> To understand the lifecycle of a DoFn, I've just created a very simple
> DoFn subclass, call log.debug() in every method, and according to the
> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
> in-memory resources, such as network connections. The resources can then be
> disposed in DoFn.Teardown." I guess I should create the connection to the
> socket server in the setup() method.
>
> But based on the log messages below, even the input PCollection has only
> one element, Beam will still create more multiple DemoIO instances and
> invoked a different DemoIO instance after every checkpoint.
>
> I'm wondering:
> 1. How could I let Beam create only one DemoIO instance, or at least use
> the same instance constantly?
> 2. Or should I use the Source API for such purpose?
>
> Thanks in advance.
>
> Logs:
> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
> First->getInitialRestriction() is called!
> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
> 9223372036854775807)->newTracker() is called!
> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.285Z -> 0 -> First
> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.786Z -> 1 -> First
> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
> 9223372036854775807)->newTracker() is called!
> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.354Z -> 2 -> First
> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->setup() is called!
> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.856Z -> 3 -> First
> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:58.358Z -> 4 -> First
> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
> 9223372036854775807)->newTracker() is called!
> 07:15:58:375