Re: GroupByKey and number of workers

2019-01-02 Thread Reza Ardeshir Rokni
Hi Mohamed,

I believe this is related to fusion which is a feature of some of the
runners, you will be able to find more information on fusion on:

https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization

Cheers

Reza

On Thu, 3 Jan 2019 at 04:09, Mohamed Haseeb  wrote:

> Hi,
>
> As per the Authoring I/O Transforms guide
> , the
> recommended way to implement a Read transform (from a source that can be
> read in parallel) has these steps:
> - Splitting the data into parts to be read in parallel (ParDo)
> - Reading from each of those parts (ParDo)
> - With a GroupByKey in between the ParDo:s
> The stated motivation for the GroupByKey is "it allows the runner to use
> different numbers of workers" for the splitting and reading parts. Can
> someone elaborate (or point to some relevant DOCs) on how the GroupByKey
> will enable using different number of works for the two ParDo steps.
>
> Thanks,
> Mohamed
>


GroupByKey and number of workers

2019-01-02 Thread Mohamed Haseeb
Hi,

As per the Authoring I/O Transforms guide
, the
recommended way to implement a Read transform (from a source that can be
read in parallel) has these steps:
- Splitting the data into parts to be read in parallel (ParDo)
- Reading from each of those parts (ParDo)
- With a GroupByKey in between the ParDo:s
The stated motivation for the GroupByKey is "it allows the runner to use
different numbers of workers" for the splitting and reading parts. Can
someone elaborate (or point to some relevant DOCs) on how the GroupByKey
will enable using different number of works for the two ParDo steps.

Thanks,
Mohamed


Re: KafkaIO and added partitions

2019-01-02 Thread Raghu Angadi
+1, we should do it.
The implementation could be something on these line:

   - While assigning Kafka partitions to each source split during the first
   run, assign them deterministically.
  - Current round-robin assignment works fine for single topic. But is
  not deterministic while reading from more than one topic. We
need to tweak
  the assignment to work well in that case.
   - On the worker, each reader should check the partitions for input topic
   (this can be part of existing periodic threads that checks backlog)
   - When partitions are added:
  - The readers (source splits) that new partitions belong to will
  start consuming from it. This is straight forward.
  - What if the new partition's watermark is older the current
  watermark? Can't do much about it since a watermark can not go back.
   - When the partitions are deleted:
  - This is a bit more tricky.
  - We need to handle the case a source split might not have any
  partitions assigned.
 - What should the watermark be? I think current wall time makes
 sense. Note that there could be new partitions added later.


On Wed, Jan 2, 2019 at 7:59 AM Alexey Romanenko 
wrote:

> I just wanted to mention that there is quite old open issue about that:
> https://issues.apache.org/jira/browse/BEAM-727
>
>  Fell free to take this one if anyone is interested.
>
> On 2 Jan 2019, at 15:22, Juan Carlos Garcia  wrote:
>
> +1
>
> Am Mi., 2. Jan. 2019, 14:34 hat Abdul Qadeer 
> geschrieben:
>
>> +1
>>
>> On Tue, 1 Jan 2019 at 12:45,  wrote:
>>
>>> +1 from my side too :-)
>>> And ideally I would want to have some hooks to let me know the extra
>>> partitions have been picked up (or a way to query it).
>>>
>>> Although if that can't be provided I can work around it myself by
>>> sending some specific message to the partition that somewhere results in a
>>> visible state change in the pipeline.
>>>
>>> Also, as a quick (semi related) heads up: I will very likely soon
>>> contribute a change to the LogAppendTimePolicy so that the idle partition
>>> behavior (automatic watermark generation) can be disabled.
>>>
>>> (of course all related to my streamy-db project)
>>>
>>> Kind regards,
>>> Jan
>>>
>>>
>>> On Tue, 1 Jan 2019 at 08:19, Ramesh Nethi 
>>> wrote:
>>>
 +1 for this capability.  This would enable pipelines to continue to run
 when such changes need to be made.

 regards
 Ramesh

 On Fri, 23 Nov 2018 at 00:40 Raghu Angadi  wrote:

> On Thu, Nov 22, 2018 at 10:10 AM Raghu Angadi 
> wrote:
>
>> - New partitions will be ignored during runtime.
>> - Update will not succeed either. Error message on the workers should
>> explain the mismatch.
>>
>
> This is the current state. Supporting changes to number of partition
> is quite doable if there is enough user interested (even in the current
> UnnoundedSource API framework).
>
>>
>> On Thu, Nov 22, 2018 at 2:15 AM Jozef Vilcek 
>> wrote:
>>
>>> Hello,
>>> just wanted to check how does Beam KafkaIO behaves when partitions
>>> are added to the topic.
>>> Will they be picked up or ignored during the runtime?
>>> Will they be picked up on restart with state restore?
>>>
>>> Thanks,
>>> Jozef
>>>
>>
>


Re: Using gRPC with PubsubIO?

2019-01-02 Thread Jeff Klukas
I believe this explains why I have observing Pubsub write errors (about
messages being too large) in logs for the Dataflow "shuffler" rather than
the workers.

The specific error I saw was about a 7 MB message being too large with
base64 encoding to meet Pubsub requirements (10 MB max message size), which
makes me think that the Dataflow Pubsub writer was still using JSON rather
than gRPC. But sounds like this is not configurable from the client and
Google has full control over the details of how Pubsub writing and reading
work in Dataflow jobs.


On Wed, Jan 2, 2019 at 1:04 PM Steve Niemitz  wrote:

> Something to consider: if you're running in Dataflow, the entire Pubsub
> read step becomes a noop [1], and the underlying streaming implementation
> itself handles reading from pubsub (either windmill or the streaming
> engine).
>
> [1]
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373
>
> On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas  wrote:
>
>> I see that the Beam codebase includes a PubsubGrpcClient, but there
>> doesn't appear to be any way to configure PubsubIO to use that client over
>> the PubsubJsonClient.
>>
>> There's even a PubsubIO.Read#withClientFactory, but it's marked as for
>> testing only.
>>
>> Is gRPC support something that's still in development? Or am I missing
>> something about how to configure this?
>>
>> I'm particularly interested in using gRPC due to the message size
>> inflation of base64 encoding required for JSON transport. My payloads are
>> all below the 10 MB Pubsub limit, but I need to support some near the top
>> end of that range that are currently causing errors due to base64 inflation.
>>
>


Re: Using gRPC with PubsubIO?

2019-01-02 Thread Steve Niemitz
Something to consider: if you're running in Dataflow, the entire Pubsub
read step becomes a noop [1], and the underlying streaming implementation
itself handles reading from pubsub (either windmill or the streaming
engine).

[1]
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373

On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas  wrote:

> I see that the Beam codebase includes a PubsubGrpcClient, but there
> doesn't appear to be any way to configure PubsubIO to use that client over
> the PubsubJsonClient.
>
> There's even a PubsubIO.Read#withClientFactory, but it's marked as for
> testing only.
>
> Is gRPC support something that's still in development? Or am I missing
> something about how to configure this?
>
> I'm particularly interested in using gRPC due to the message size
> inflation of base64 encoding required for JSON transport. My payloads are
> all below the 10 MB Pubsub limit, but I need to support some near the top
> end of that range that are currently causing errors due to base64 inflation.
>


cyclic (non-DAG) computations

2019-01-02 Thread jan . doms
Hi all,

In the documentation it says that a pipeline forms a DAG.
For my project however I need to introduce a loop (feed back some results
to an earlier phase of the pipeline).
For now I have solved this using a separate kafka topic, from which I read
somewhere in the beginning of the pipeline, and only write to .

Using an extra kafka topic of course breaks the flow of eventtime and
watermarks. Also I'm achieving at-least-once behavior iso exactly-once.
For my application these behaviors are acceptable (although it could
perhaps come in handy if I would keep control over the flow of watermarks,
but it's not absolutely needed).

Is there a better (more efficient) way to achieve this?

I was thinking of creating a 'pipe'.

interface Pipe {
  PTransform> getSource();
  PTransform, PDone> getSink();
}

Pipe createPipe(String name);

the source and sink should be internally connected and participate
correctly in the snapshotting flow. (not sure yet about the required
details for this so that it is correct, any help/pointers would be
appreciated!)

However, before I embark on this effort I would like to hear your thoughts
about it.

Thanks, kind regards,
Jan


Re: KafkaIO and added partitions

2019-01-02 Thread Juan Carlos Garcia
+1

Am Mi., 2. Jan. 2019, 14:34 hat Abdul Qadeer 
geschrieben:

> +1
>
> On Tue, 1 Jan 2019 at 12:45,  wrote:
>
>> +1 from my side too :-)
>> And ideally I would want to have some hooks to let me know the extra
>> partitions have been picked up (or a way to query it).
>>
>> Although if that can't be provided I can work around it myself by sending
>> some specific message to the partition that somewhere results in a visible
>> state change in the pipeline.
>>
>> Also, as a quick (semi related) heads up: I will very likely soon
>> contribute a change to the LogAppendTimePolicy so that the idle partition
>> behavior (automatic watermark generation) can be disabled.
>>
>> (of course all related to my streamy-db project)
>>
>> Kind regards,
>> Jan
>>
>>
>> On Tue, 1 Jan 2019 at 08:19, Ramesh Nethi  wrote:
>>
>>> +1 for this capability.  This would enable pipelines to continue to run
>>> when such changes need to be made.
>>>
>>> regards
>>> Ramesh
>>>
>>> On Fri, 23 Nov 2018 at 00:40 Raghu Angadi  wrote:
>>>
 On Thu, Nov 22, 2018 at 10:10 AM Raghu Angadi 
 wrote:

> - New partitions will be ignored during runtime.
> - Update will not succeed either. Error message on the workers should
> explain the mismatch.
>

 This is the current state. Supporting changes to number of partition is
 quite doable if there is enough user interested (even in the current
 UnnoundedSource API framework).

>
> On Thu, Nov 22, 2018 at 2:15 AM Jozef Vilcek 
> wrote:
>
>> Hello,
>> just wanted to check how does Beam KafkaIO behaves when partitions
>> are added to the topic.
>> Will they be picked up or ignored during the runtime?
>> Will they be picked up on restart with state restore?
>>
>> Thanks,
>> Jozef
>>
>


Re: KafkaIO and added partitions

2019-01-02 Thread Abdul Qadeer
+1

On Tue, 1 Jan 2019 at 12:45,  wrote:

> +1 from my side too :-)
> And ideally I would want to have some hooks to let me know the extra
> partitions have been picked up (or a way to query it).
>
> Although if that can't be provided I can work around it myself by sending
> some specific message to the partition that somewhere results in a visible
> state change in the pipeline.
>
> Also, as a quick (semi related) heads up: I will very likely soon
> contribute a change to the LogAppendTimePolicy so that the idle partition
> behavior (automatic watermark generation) can be disabled.
>
> (of course all related to my streamy-db project)
>
> Kind regards,
> Jan
>
>
> On Tue, 1 Jan 2019 at 08:19, Ramesh Nethi  wrote:
>
>> +1 for this capability.  This would enable pipelines to continue to run
>> when such changes need to be made.
>>
>> regards
>> Ramesh
>>
>> On Fri, 23 Nov 2018 at 00:40 Raghu Angadi  wrote:
>>
>>> On Thu, Nov 22, 2018 at 10:10 AM Raghu Angadi 
>>> wrote:
>>>
 - New partitions will be ignored during runtime.
 - Update will not succeed either. Error message on the workers should
 explain the mismatch.

>>>
>>> This is the current state. Supporting changes to number of partition is
>>> quite doable if there is enough user interested (even in the current
>>> UnnoundedSource API framework).
>>>

 On Thu, Nov 22, 2018 at 2:15 AM Jozef Vilcek 
 wrote:

> Hello,
> just wanted to check how does Beam KafkaIO behaves when partitions are
> added to the topic.
> Will they be picked up or ignored during the runtime?
> Will they be picked up on restart with state restore?
>
> Thanks,
> Jozef
>