Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Well, if every input record's timestamp is X, watermark staying at X is the
right answer, no? But I am not sure where the disagreement is, actually. I
might be mistaken.

KafkaIO has a few in-built policies for watermark and timestamp that cover
most use cases (including server time, which has a benefit of providing
perfect watermark). It also gives fairly complete control on these to users
if they chose to. I think it looks like reasonable for a policy to base its
watermark only only on parsable records, and ignore unparsable records
w.r.t watermark calculation. It could even assign a timestamp that makes
more logical sense in a particular application.

On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles  wrote:

> Forgive me if this is naive or missing something, but here are my thoughts
> on these alternatives:
>
> (0) Timestamp has to be pulled out in the source to control the watermark.
> Luke's point is imortant.
>
> (1) If bad records get min_timestamp, and they occur infrequently enough,
> then watermark will advance and they will all be dropped. That will not
> allow output to a dead-letter queue.
>
> (2) If you have always min_timestamp records, or if bad records are
> frequent, the watermark will never advance. So windows/aggregations would
> never be considered complete. Triggers could be used to get output anyhow,
> but it would never be a final answer. I think it is not in the spirit of
> Beam to work this way. Pragmatically, no state could ever be freed by a
> runner.
>
> In SQL there is an actual "dead letter" option when creating a table that
> parses from a bytes source. If, for example, a JSON record cannot be parsed
> to the expected schema - like maybe an avro record got in the stream, or
> the JSON doesn't match the expected schema - it is output as-is to a
> user-specified dead letter queue. I think this same level of support is
> also required for records that cannot have timestamps extracted in an
> unbounded source.
>
> In an SDF I think the function has enough control to do it all in
> "userland", so Cham is right on here.
>
> Kenn
>
> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik  wrote:
>
>> That depends on the users pipeline and how watermark advancement of the
>> source may impact elements becoming droppably late if they are emitted with
>> the minimum timestamp.
>>
>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi  wrote:
>>
>>> I see.
>>>
>>> What I meant was to return min_timestamp for bad records in the
>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>> parsable records. That should work too, right?
>>>
>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>>>
 Yes, that would be fine.

 The user could then use a ParDo which outputs to a DLQ for things it
 can't parse the timestamp for and use outputWithTimestamp[1] for everything
 else.

 1:
 https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-

 On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi 
 wrote:

> Thanks. So returning  min timestamp is OK, right (assuming application
> fine is with what it means)?
>
> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>
>> All records in Apache Beam have a timestamp. The default timestamp is
>> the min timestamp defined here:
>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>
>>
>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
>> wrote:
>>
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik 
>>> wrote:
>>>
 You would have to return min timestamp for all records otherwise
 the watermark may have advanced and you would be outputting records 
 that
 are droppably late.

>>>
>>> That would be fine I guess. What’s the timestamp for a record that
>>> doesn’t have one?
>>>
>>>
 On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
 wrote:

> To be clear, returning min_timestamp for unparsable records shound
> not affect the watermark.
>
> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
> wrote:
>
>> How about returning min_timestamp? The would be dropped or
>> redirected by the ParDo after that.
>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>> API, is this pipeline defined under kafkaio package?
>>
>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>> wrote:
>>
>>> In this case, the user is attempting to handle errors when
>>> parsing the timestamp. The timestamp controls the watermark for the
>>> UnboundedSource, how would they control the watermark in a 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Kenneth Knowles
Forgive me if this is naive or missing something, but here are my thoughts
on these alternatives:

(0) Timestamp has to be pulled out in the source to control the watermark.
Luke's point is imortant.

(1) If bad records get min_timestamp, and they occur infrequently enough,
then watermark will advance and they will all be dropped. That will not
allow output to a dead-letter queue.

(2) If you have always min_timestamp records, or if bad records are
frequent, the watermark will never advance. So windows/aggregations would
never be considered complete. Triggers could be used to get output anyhow,
but it would never be a final answer. I think it is not in the spirit of
Beam to work this way. Pragmatically, no state could ever be freed by a
runner.

In SQL there is an actual "dead letter" option when creating a table that
parses from a bytes source. If, for example, a JSON record cannot be parsed
to the expected schema - like maybe an avro record got in the stream, or
the JSON doesn't match the expected schema - it is output as-is to a
user-specified dead letter queue. I think this same level of support is
also required for records that cannot have timestamps extracted in an
unbounded source.

In an SDF I think the function has enough control to do it all in
"userland", so Cham is right on here.

Kenn

On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik  wrote:

> That depends on the users pipeline and how watermark advancement of the
> source may impact elements becoming droppably late if they are emitted with
> the minimum timestamp.
>
> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi  wrote:
>
>> I see.
>>
>> What I meant was to return min_timestamp for bad records in the timestamp
>> handler passed to KafkaIO itself, and correct timestamp for parsable
>> records. That should work too, right?
>>
>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>>
>>> Yes, that would be fine.
>>>
>>> The user could then use a ParDo which outputs to a DLQ for things it
>>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>>> else.
>>>
>>> 1:
>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>
>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>>>
 Thanks. So returning  min timestamp is OK, right (assuming application
 fine is with what it means)?

 On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:

> All records in Apache Beam have a timestamp. The default timestamp is
> the min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
> wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik 
>> wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>> wrote:
>>>
 To be clear, returning min_timestamp for unparsable records shound
 not affect the watermark.

 On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
 wrote:

> How about returning min_timestamp? The would be dropped or
> redirected by the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
> API, is this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
> wrote:
>
>> In this case, the user is attempting to handle errors when
>> parsing the timestamp. The timestamp controls the watermark for the
>> UnboundedSource, how would they control the watermark in a 
>> downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>> wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of
 applying a function that would result in an exception,  this can be
 extracted by a ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should,
>>> unless there is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that
>>> makes to simpler to handle deadletter output? I think there was a 
>>> thread
>>> about it in the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
That depends on the users pipeline and how watermark advancement of the
source may impact elements becoming droppably late if they are emitted with
the minimum timestamp.

On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi  wrote:

> I see.
>
> What I meant was to return min_timestamp for bad records in the timestamp
> handler passed to KafkaIO itself, and correct timestamp for parsable
> records. That should work too, right?
>
> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>
>> Yes, that would be fine.
>>
>> The user could then use a ParDo which outputs to a DLQ for things it
>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>> else.
>>
>> 1:
>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>
>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>>
>>> Thanks. So returning  min timestamp is OK, right (assuming application
>>> fine is with what it means)?
>>>
>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>>>
 All records in Apache Beam have a timestamp. The default timestamp is
 the min timestamp defined here:
 https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


 On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
 wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>
>> You would have to return min timestamp for all records otherwise the
>> watermark may have advanced and you would be outputting records that are
>> droppably late.
>>
>
> That would be fine I guess. What’s the timestamp for a record that
> doesn’t have one?
>
>
>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>> wrote:
>>
>>> To be clear, returning min_timestamp for unparsable records shound
>>> not affect the watermark.
>>>
>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>> wrote:
>>>
 How about returning min_timestamp? The would be dropped or
 redirected by the ParDo after that.
 Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
 is this pipeline defined under kafkaio package?

 On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
 wrote:

> In this case, the user is attempting to handle errors when parsing
> the timestamp. The timestamp controls the watermark for the
> UnboundedSource, how would they control the watermark in a downstream 
> ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
> wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying
>>> a function that would result in an exception,  this can be 
>>> extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should,
>> unless there is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes
>> to simpler to handle deadletter output? I think there was a thread 
>> about it
>> in the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>> jcgarc...@gmail.com> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert
 that you can extract your Success Records (TupleTag) and your 
 DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt.
 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize
> explicitly in a ParDo, which gives complete control on how to 
> handle record
> errors. This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be
> convenient for users in many such scenarios. This is simpler than 
> each
> source supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is
>> probably not something that can easily be supported. We might be 
>> able to
>> support similar features when we have Kafka on top of Splittable 
>> DoFn
>> though.
>>
> So feel free to create a feature 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
I see.

What I meant was to return min_timestamp for bad records in the timestamp
handler passed to KafkaIO itself, and correct timestamp for parsable
records. That should work too, right?

On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:

> Yes, that would be fine.
>
> The user could then use a ParDo which outputs to a DLQ for things it can't
> parse the timestamp for and use outputWithTimestamp[1] for everything else.
>
> 1:
> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>
> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>
>> Thanks. So returning  min timestamp is OK, right (assuming application
>> fine is with what it means)?
>>
>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>>
>>> All records in Apache Beam have a timestamp. The default timestamp is
>>> the min timestamp defined here:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
>>> wrote:
>>>


 On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

 That would be fine I guess. What’s the timestamp for a record that
 doesn’t have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
> wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound
>> not affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>> wrote:
>>
>>> How about returning min_timestamp? The would be dropped or
>>> redirected by the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>> is this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>>> wrote:
>>>
 In this case, the user is attempting to handle errors when parsing
 the timestamp. The timestamp controls the watermark for the
 UnboundedSource, how would they control the watermark in a downstream 
 ParDo?

 On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
 wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying
>> a function that would result in an exception,  this can be extracted 
>> by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless
> there is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes
> to simpler to handle deadletter output? I think there was a thread 
> about it
> in the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>> jcgarc...@gmail.com> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>> that you can extract your Success Records (TupleTag) and your 
>>> DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt.
>>> 2018, 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize
 explicitly in a ParDo, which gives complete control on how to 
 handle record
 errors. This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be
 convenient for users in many such scenarios. This is simpler than 
 each
 source supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is
> probably not something that can easily be supported. We might be 
> able to
> support similar features when we have Kafka on top of Splittable 
> DoFn
> though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
> k...@google.com> wrote:
>
>> This is a great question. I've added the dev list to be sure
>> it gets noticed by whoever may know best.
>>

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Reza Rokni
Hi,

Not directly connected ( its for java sdk  ) but some of the concepts in
these materials maybe useful:

https://cloud.google.com/blog/products/gcp/running-external-libraries-with-cloud-dataflow-for-grid-computing-workloads

https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/subprocess

https://cloud.google.com/solutions/running-external-binaries-beam-grid-computing




On 25 October 2018 at 04:23, Jeff Klukas  wrote:

> Another option here would be to make the perl script operate on batches.
> Your DoFn could then store the records to a buffer rather than outputting
> them and then periodically flush the buffer, sending records through the
> perl script and sending to output.
>
> On Wed, Oct 24, 2018 at 3:03 PM Robert Bradshaw 
> wrote:
>
>> While one does want to watch out for expensive per-record operations,
>> this may still be preferable to (and cheaper than) setting up a server and
>> making RPC requests. It depends on the nature of the operation. If
>> executing the perl script is (say) 100ms of "startup" for 1ms of actually
>> processing $DATA, then you'll be wasting a lot of cycles and a server may
>> be the way to go, but if it's 1ms of startup for 100ms of processing $DATA
>> than this startup cost won't matter at all.
>>
>> If the startup cost is prohibitive, you could also start up a local
>> "server" on the worker in startBundle (or even setUp), and shut it down in
>> finishBundle, and communicate with it in your processElement.
>>
>> The other bit is actually shipping your perl script (and, more tricky,
>> its dependencies). Currently that's very runner-dependent, and typically
>> you end up packing it as data in your jars and then trying to
>> unpack/install it on the workers at runtime. One of the goals of
>> https://beam.apache.org/contribute/portability/ is to make this easier,
>> specifically, you can set up your worker environment as a docker container
>> with everything you need and this will get used as the environment in which
>> your DoFns are executed.
>>
>>
>> On Wed, Oct 24, 2018 at 6:48 AM Sobhan Badiozamany <
>> sobhan.badiozam...@leovegas.com> wrote:
>>
>>> Hi Nima,
>>>
>>> I think the answer depends on the use-case, but what you suggest is on
>>> the list of practices that hurt scalability of pipelines as it will be an
>>> example of “Expensive Per-Record Operations”, look it up here:
>>> https://cloud.google.com/blog/products/gcp/writing-dataflow-
>>> pipelines-with-scalability-in-mind
>>>
>>> Cheers,
>>> Sobi
>>>
>>> Sent from my iPhone
>>>
>>> On Oct 23, 2018, at 23:35, Nima Mousavi  wrote:
>>>
>>> Hi,
>>>
>>> We have a dataflow pipeline written in Apache python beam, and are
>>> wondering if we can run a third party code (written in perl) in the
>>> pipeline. We basically want to run
>>>
>>> perl myscript.pl $DATA
>>>
>>> for every DATA in a PCollection passed to a DoFn
>>>
>>> and write the result back into Bigquery.  We could have setup a server
>>> for myscript.pl, and send HTTP/RPC request to the server from each
>>> worker instead. But we are wondering if it is possible to run the script
>>> directly inside the Beam worker? Or even through a docker container
>>> packaging our perl script? If yes, how? what do you think of this approach?
>>> Any caveat we should be aware of?
>>>
>>> Thanks!
>>>
>>>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
Yes, that would be fine.

The user could then use a ParDo which outputs to a DLQ for things it can't
parse the timestamp for and use outputWithTimestamp[1] for everything else.

1:
https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-

On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:

> Thanks. So returning  min timestamp is OK, right (assuming application
> fine is with what it means)?
>
> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>
>> All records in Apache Beam have a timestamp. The default timestamp is the
>> min timestamp defined here:
>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>
>>
>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:
>>
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>>
 You would have to return min timestamp for all records otherwise the
 watermark may have advanced and you would be outputting records that are
 droppably late.

>>>
>>> That would be fine I guess. What’s the timestamp for a record that
>>> doesn’t have one?
>>>
>>>
 On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
 wrote:

> To be clear, returning min_timestamp for unparsable records shound not
> affect the watermark.
>
> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
> wrote:
>
>> How about returning min_timestamp? The would be dropped or redirected
>> by the ParDo after that.
>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>> is this pipeline defined under kafkaio package?
>>
>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>> wrote:
>>
>>> In this case, the user is attempting to handle errors when parsing
>>> the timestamp. The timestamp controls the watermark for the
>>> UnboundedSource, how would they control the watermark in a downstream 
>>> ParDo?
>>>
>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>>> wrote:
>>>
 On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by 
> a
> ParDo down the line.
>

 KafkaIO does return bytes, and I think most sources should, unless
 there is a good reason not to.
 Given that, do we think Beam should provide a tranform that makes
 to simpler to handle deadletter output? I think there was a thread 
 about it
 in the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
> jcgarc...@gmail.com> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert
>> that you can extract your Success Records (TupleTag) and your 
>> DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>> 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize
>>> explicitly in a ParDo, which gives complete control on how to 
>>> handle record
>>> errors. This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be
>>> convenient for users in many such scenarios. This is simpler than 
>>> each
>>> source supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is
 probably not something that can easily be supported. We might be 
 able to
 support similar features when we have Kafka on top of Splittable 
 DoFn
 though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
 wrote:

> This is a great question. I've added the dev list to be sure
> it gets noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline
>> that uses a KafkaIO
>> connector for it's input? As
>> 

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Jeff Klukas
Another option here would be to make the perl script operate on batches.
Your DoFn could then store the records to a buffer rather than outputting
them and then periodically flush the buffer, sending records through the
perl script and sending to output.

On Wed, Oct 24, 2018 at 3:03 PM Robert Bradshaw  wrote:

> While one does want to watch out for expensive per-record operations, this
> may still be preferable to (and cheaper than) setting up a server and
> making RPC requests. It depends on the nature of the operation. If
> executing the perl script is (say) 100ms of "startup" for 1ms of actually
> processing $DATA, then you'll be wasting a lot of cycles and a server may
> be the way to go, but if it's 1ms of startup for 100ms of processing $DATA
> than this startup cost won't matter at all.
>
> If the startup cost is prohibitive, you could also start up a local
> "server" on the worker in startBundle (or even setUp), and shut it down in
> finishBundle, and communicate with it in your processElement.
>
> The other bit is actually shipping your perl script (and, more tricky, its
> dependencies). Currently that's very runner-dependent, and typically you
> end up packing it as data in your jars and then trying to unpack/install it
> on the workers at runtime. One of the goals of
> https://beam.apache.org/contribute/portability/ is to make this easier,
> specifically, you can set up your worker environment as a docker container
> with everything you need and this will get used as the environment in which
> your DoFns are executed.
>
>
> On Wed, Oct 24, 2018 at 6:48 AM Sobhan Badiozamany <
> sobhan.badiozam...@leovegas.com> wrote:
>
>> Hi Nima,
>>
>> I think the answer depends on the use-case, but what you suggest is on
>> the list of practices that hurt scalability of pipelines as it will be an
>> example of “Expensive Per-Record Operations”, look it up here:
>>
>> https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind
>>
>> Cheers,
>> Sobi
>>
>> Sent from my iPhone
>>
>> On Oct 23, 2018, at 23:35, Nima Mousavi  wrote:
>>
>> Hi,
>>
>> We have a dataflow pipeline written in Apache python beam, and are
>> wondering if we can run a third party code (written in perl) in the
>> pipeline. We basically want to run
>>
>> perl myscript.pl $DATA
>>
>> for every DATA in a PCollection passed to a DoFn
>>
>> and write the result back into Bigquery.  We could have setup a server
>> for myscript.pl, and send HTTP/RPC request to the server from each
>> worker instead. But we are wondering if it is possible to run the script
>> directly inside the Beam worker? Or even through a docker container
>> packaging our perl script? If yes, how? what do you think of this approach?
>> Any caveat we should be aware of?
>>
>> Thanks!
>>
>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Thanks. So returning  min timestamp is OK, right (assuming application fine
is with what it means)?

On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:

> All records in Apache Beam have a timestamp. The default timestamp is the
> min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>> wrote:
>>>
 To be clear, returning min_timestamp for unparsable records shound not
 affect the watermark.

 On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
 wrote:

> How about returning min_timestamp? The would be dropped or redirected
> by the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>
>> In this case, the user is attempting to handle errors when parsing
>> the timestamp. The timestamp controls the watermark for the
>> UnboundedSource, how would they control the watermark in a downstream 
>> ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>> wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of applying a
 function that would result in an exception,  this can be extracted by a
 ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should, unless
>>> there is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about 
>>> it in
>>> the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
 jcgarc...@gmail.com> wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert
> that you can extract your Success Records (TupleTag) and your 
> DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
> 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize
>> explicitly in a ParDo, which gives complete control on how to handle 
>> record
>> errors. This is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be
>> convenient for users in many such scenarios. This is simpler than 
>> each
>> source supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is
>>> probably not something that can easily be supported. We might be 
>>> able to
>>> support similar features when we have Kafka on top of Splittable 
>>> DoFn
>>> though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>> wrote:
>>>
 This is a great question. I've added the dev list to be sure it
 gets noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that
> uses a KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be
> able to produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a
> KafkaIO input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
All records in Apache Beam have a timestamp. The default timestamp is the
min timestamp defined here:
https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>
>> You would have to return min timestamp for all records otherwise the
>> watermark may have advanced and you would be outputting records that are
>> droppably late.
>>
>
> That would be fine I guess. What’s the timestamp for a record that doesn’t
> have one?
>
>
>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:
>>
>>> To be clear, returning min_timestamp for unparsable records shound not
>>> affect the watermark.
>>>
>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>> wrote:
>>>
 How about returning min_timestamp? The would be dropped or redirected
 by the ParDo after that.
 Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
 this pipeline defined under kafkaio package?

 On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
> wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless
>> there is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it 
>> in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>> jcgarc...@gmail.com> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert that
 you can extract your Success Records (TupleTag) and your DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
 05:18:

> User can read serialized bytes from KafkaIO and deserialize
> explicitly in a ParDo, which gives complete control on how to handle 
> record
> errors. This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be
> convenient for users in many such scenarios. This is simpler than each
> source supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is
>> probably not something that can easily be supported. We might be 
>> able to
>> support similar features when we have Kafka on top of Splittable DoFn
>> though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>> wrote:
>>
>>> This is a great question. I've added the dev list to be sure it
>>> gets noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that
 uses a KafkaIO
 connector for it's input? As
 `TimestampPolicyFactory.withTimestampFn()` takes
 only a SerializableFunction and not a ParDo, how would I be
 able to produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO
 input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(

 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))

 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
 "earliest"))
 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

That would be fine I guess. What’s the timestamp for a record that doesn’t
have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound not
>> affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:
>>
>>> How about returning min_timestamp? The would be dropped or redirected by
>>> the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>> this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>>
 In this case, the user is attempting to handle errors when parsing the
 timestamp. The timestamp controls the watermark for the UnboundedSource,
 how would they control the watermark in a downstream ParDo?

 On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
 wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying a
>> function that would result in an exception,  this can be extracted by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless
> there is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes to
> simpler to handle deadletter output? I think there was a thread about it 
> in
> the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>> jcgarc...@gmail.com> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>>> 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize
 explicitly in a ParDo, which gives complete control on how to handle 
 record
 errors. This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be
 convenient for users in many such scenarios. This is simpler than each
 source supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably
> not something that can easily be supported. We might be able to 
> support
> similar features when we have Kafka on top of Splittable DoFn though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
> wrote:
>
>> This is a great question. I've added the dev list to be sure it
>> gets noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>> tobias.kay...@ricardo.ch> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that
>>> uses a KafkaIO
>>> connector for it's input? As
>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>> only a SerializableFunction and not a ParDo, how would I be able
>>> to produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO
>>> input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>>
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>> "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>> "true"))
>>> .withTimestampPolicyFactory(
>>> TimestampPolicyFactory.withTimestampFn(
>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>> .withReadCommitted()
>>> .commitOffsetsInFinalize())
>>>
>>>

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
You would have to return min timestamp for all records otherwise the
watermark may have advanced and you would be outputting records that are
droppably late.

On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:

> To be clear, returning min_timestamp for unparsable records shound not
> affect the watermark.
>
> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:
>
>> How about returning min_timestamp? The would be dropped or redirected by
>> the ParDo after that.
>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>> this pipeline defined under kafkaio package?
>>
>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>
>>> In this case, the user is attempting to handle errors when parsing the
>>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>>> how would they control the watermark in a downstream ParDo?
>>>
>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>>>
 On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

 KafkaIO does return bytes, and I think most sources should, unless
 there is a good reason not to.
 Given that, do we think Beam should provide a tranform that makes to
 simpler to handle deadletter output? I think there was a thread about it in
 the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
> jcgarc...@gmail.com> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that
>> you can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>> 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize
>>> explicitly in a ParDo, which gives complete control on how to handle 
>>> record
>>> errors. This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be
>>> convenient for users in many such scenarios. This is simpler than each
>>> source supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is probably
 not something that can easily be supported. We might be able to support
 similar features when we have Kafka on top of Splittable DoFn though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
 wrote:

> This is a great question. I've added the dev list to be sure it
> gets noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline that
>> uses a KafkaIO
>> connector for it's input? As
>> `TimestampPolicyFactory.withTimestampFn()` takes
>> only a SerializableFunction and not a ParDo, how would I be able
>> to produce a
>> Deadletter output from it?
>>
>> I have the following pipeline defined that reads from a KafkaIO
>> input:
>>
>> pipeline.apply(
>>   KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>> "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "beam-consumers"))
>>
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>> "true"))
>> .withTimestampPolicyFactory(
>> TimestampPolicyFactory.withTimestampFn(
>> new MessageTimestampExtractor(inputMessagesConfig)))
>> .withReadCommitted()
>> .commitOffsetsInFinalize())
>>
>>
>> and I like to get deadletter outputs when my timestamp extraction
>> fails.
>>
>> Best,
>> Tobi
>>
>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
To be clear, returning min_timestamp for unparsable records shound not
affect the watermark.

On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:

> How about returning min_timestamp? The would be dropped or redirected by
> the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>
>> In this case, the user is attempting to handle errors when parsing the
>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>> how would they control the watermark in a downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>>> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of applying a
 function that would result in an exception,  this can be extracted by a
 ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should, unless there
>>> is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about it in
>>> the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
 jcgarc...@gmail.com> wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert that
> you can extract your Success Records (TupleTag) and your DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
> 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize
>> explicitly in a ParDo, which gives complete control on how to handle 
>> record
>> errors. This is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be
>> convenient for users in many such scenarios. This is simpler than each
>> source supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>> not something that can easily be supported. We might be able to support
>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>> wrote:
>>>
 This is a great question. I've added the dev list to be sure it
 gets noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that
> uses a KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be able
> to produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a KafkaIO
> input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ConfigurableDeserializer.class)
> .updateConsumerProperties(
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> "earliest"))
> .updateConsumerProperties(ImmutableMap.of("group.id",
> "beam-consumers"))
>
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
> "true"))
> .withTimestampPolicyFactory(
> TimestampPolicyFactory.withTimestampFn(
> new MessageTimestampExtractor(inputMessagesConfig)))
> .withReadCommitted()
> .commitOffsetsInFinalize())
>
>
> and I like to get deadletter outputs when my timestamp extraction
> fails.
>
> Best,
> Tobi
>
>


Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Robert Bradshaw
While one does want to watch out for expensive per-record operations, this
may still be preferable to (and cheaper than) setting up a server and
making RPC requests. It depends on the nature of the operation. If
executing the perl script is (say) 100ms of "startup" for 1ms of actually
processing $DATA, then you'll be wasting a lot of cycles and a server may
be the way to go, but if it's 1ms of startup for 100ms of processing $DATA
than this startup cost won't matter at all.

If the startup cost is prohibitive, you could also start up a local
"server" on the worker in startBundle (or even setUp), and shut it down in
finishBundle, and communicate with it in your processElement.

The other bit is actually shipping your perl script (and, more tricky, its
dependencies). Currently that's very runner-dependent, and typically you
end up packing it as data in your jars and then trying to unpack/install it
on the workers at runtime. One of the goals of
https://beam.apache.org/contribute/portability/ is to make this easier,
specifically, you can set up your worker environment as a docker container
with everything you need and this will get used as the environment in which
your DoFns are executed.


On Wed, Oct 24, 2018 at 6:48 AM Sobhan Badiozamany <
sobhan.badiozam...@leovegas.com> wrote:

> Hi Nima,
>
> I think the answer depends on the use-case, but what you suggest is on the
> list of practices that hurt scalability of pipelines as it will be an
> example of “Expensive Per-Record Operations”, look it up here:
>
> https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind
>
> Cheers,
> Sobi
>
> Sent from my iPhone
>
> On Oct 23, 2018, at 23:35, Nima Mousavi  wrote:
>
> Hi,
>
> We have a dataflow pipeline written in Apache python beam, and are
> wondering if we can run a third party code (written in perl) in the
> pipeline. We basically want to run
>
> perl myscript.pl $DATA
>
> for every DATA in a PCollection passed to a DoFn
>
> and write the result back into Bigquery.  We could have setup a server for
> myscript.pl, and send HTTP/RPC request to the server from each worker
> instead. But we are wondering if it is possible to run the script directly
> inside the Beam worker? Or even through a docker container packaging our
> perl script? If yes, how? what do you think of this approach? Any caveat we
> should be aware of?
>
> Thanks!
>
>


Re: write to a kafka topic that is set in data

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi  wrote:

> My bad Alexey, I will review today. I had skimmed through the patch on my
> phone. You are right, exactly-once sink support is not required for now.
>



> It is a quite a different beast and necessarily coupled with transactions
> on a specific topic-partitions for correctness.
>
Actually I take this back. It I don't think it coupled with output topic
and partitions. It might just work (assuming Kafka can handle individual
transactions spanning many topics well). As you mentioned, we would still
need to plumb it through. As such we don't know if exactly-once sink is
being used much... (I would love to hear about it if anyone is using it).


>
> The primary concern is with the API. The user provides a function to map
> an output record to its topic. We have found that such an API is usually
> problematic. E.g. what if the record does not encode enough information
> about topic? Say we want to select topic name based on aggregation window.
> It might be bit more code, but simpler to let the user decide topic for
> each record _before_ writing to the sink. E.g. it could be
> KafkaIO.Writer>.
> I wanted to think a little bit more about this, but didn't get around to
> it. I will comment on the PR today.
>
> thanks for the initiative and the PR.
> Raghu.
> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko 
> wrote:
>
>> I added a simple support of this for usual type of Kafka sink (PR:
>> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>>
>> In the same time, there is another, more complicated, type of sink - EOS
>> (Exactly Once Sink). In this case the data is partitioned among fixed
>> number of shards and it creates one ShardWriter per shard. In its
>> order, ShardWriter depends on Kafka topic. So, seems that in case of
>> multiple and dynamic sink topics, we need to create new ShardWriter for
>> every new topic per shard,
>>
>> Is my assumption correct or I missed/misunderstood something?
>>
>> On 20 Oct 2018, at 01:21, Lukasz Cwik  wrote:
>>
>> Thanks Raghu, added starter and newbie labels to the issue.
>>
>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi  wrote:
>>
>>> It will be a good starter feature for someone interested in Beam &
>>> Kafka. Writer is very simple in Beam. It is little more than a ParDo.
>>>
>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev  wrote:
>>>
 Lukasz, I appreciate the quick response and filing the JIRA ticket.
 Thanks for the suggestion, unfortunately, I don't have a fixed number of
 topics. Still, we'll probably use your approach for a limited number of
 topics until the functionality is added, thank you!

 Thanks,
 Dmitry

 On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik  wrote:

> If there are a fixed number of topics, you could partition your write
> by structuring your pipeline as such:
> ParDo(PartitionByTopic) > KafkaIO.write(topicA)
> \---> KafkaIO.write(topicB)
> \---> KafkaIO.write(...)
>
> There is no support currently for writing to Kafka dynamically based
> upon a destination that is part of the data.
> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>
> On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com 
> wrote:
>
>> Hi guys!!
>>
>> I'm trying to find a way to write to a Kafka topic using
>> KafkaIO.write() But I need to be able to get topic name dynamically based
>> on the data received. For example, I would like to send data for one 
>> tenant
>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>> I'm coming from Flink where it's possible via
>> KeyedSerializationSchema.getTargetTopic().
>> Is there anything similar in KafkaIO?
>>
>> Thanks,
>> Dmitry
>>
> --

 --
 Dmitry

>>>
>>


Re: write to a kafka topic that is set in data

2018-10-24 Thread Raghu Angadi
My bad Alexey, I will review today. I had skimmed through the patch on my
phone. You are right, exactly-once sink support is not required for now. It
is a quite a different beast and necessarily coupled with transactions on a
specific topic-partitions for correctness.

The primary concern is with the API. The user provides a function to map an
output record to its topic. We have found that such an API is usually
problematic. E.g. what if the record does not encode enough information
about topic? Say we want to select topic name based on aggregation window.
It might be bit more code, but simpler to let the user decide topic for
each record _before_ writing to the sink. E.g. it could be
KafkaIO.Writer>.
I wanted to think a little bit more about this, but didn't get around to
it. I will comment on the PR today.

thanks for the initiative and the PR.
Raghu.
On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko 
wrote:

> I added a simple support of this for usual type of Kafka sink (PR:
> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>
> In the same time, there is another, more complicated, type of sink - EOS
> (Exactly Once Sink). In this case the data is partitioned among fixed
> number of shards and it creates one ShardWriter per shard. In its
> order, ShardWriter depends on Kafka topic. So, seems that in case of
> multiple and dynamic sink topics, we need to create new ShardWriter for
> every new topic per shard,
>
> Is my assumption correct or I missed/misunderstood something?
>
> On 20 Oct 2018, at 01:21, Lukasz Cwik  wrote:
>
> Thanks Raghu, added starter and newbie labels to the issue.
>
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi  wrote:
>
>> It will be a good starter feature for someone interested in Beam & Kafka.
>> Writer is very simple in Beam. It is little more than a ParDo.
>>
>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev  wrote:
>>
>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>> topics. Still, we'll probably use your approach for a limited number of
>>> topics until the functionality is added, thank you!
>>>
>>> Thanks,
>>> Dmitry
>>>
>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik  wrote:
>>>
 If there are a fixed number of topics, you could partition your write
 by structuring your pipeline as such:
 ParDo(PartitionByTopic) > KafkaIO.write(topicA)
 \---> KafkaIO.write(topicB)
 \---> KafkaIO.write(...)

 There is no support currently for writing to Kafka dynamically based
 upon a destination that is part of the data.
 I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.

 On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com 
 wrote:

> Hi guys!!
>
> I'm trying to find a way to write to a Kafka topic using
> KafkaIO.write() But I need to be able to get topic name dynamically based
> on the data received. For example, I would like to send data for one 
> tenant
> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
> I'm coming from Flink where it's possible via
> KeyedSerializationSchema.getTargetTopic().
> Is there anything similar in KafkaIO?
>
> Thanks,
> Dmitry
>
 --
>>>
>>> --
>>> Dmitry
>>>
>>
>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
How about returning min_timestamp? The would be dropped or redirected by
the ParDo after that.
Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this
pipeline defined under kafkaio package?

On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless there
>> is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
>>> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert that you
 can extract your Success Records (TupleTag) and your DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly
> in a ParDo, which gives complete control on how to handle record errors.
> This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient
> for users in many such scenarios. This is simpler than each source
> supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably
>> not something that can easily be supported. We might be able to support
>> similar features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>> wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that uses
 a KafkaIO
 connector for it's input? As
 `TimestampPolicyFactory.withTimestampFn()` takes
 only a SerializableFunction and not a ParDo, how would I be able to
 produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO
 input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(
 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))
 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
 "earliest"))
 .updateConsumerProperties(ImmutableMap.of("group.id",
 "beam-consumers"))
 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
 "true"))
 .withTimestampPolicyFactory(
 TimestampPolicyFactory.withTimestampFn(
 new MessageTimestampExtractor(inputMessagesConfig)))
 .withReadCommitted()
 .commitOffsetsInFinalize())


 and I like to get deadletter outputs when my timestamp extraction
 fails.

 Best,
 Tobi




Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
In this case, the user is attempting to handle errors when parsing the
timestamp. The timestamp controls the watermark for the UnboundedSource,
how would they control the watermark in a downstream ParDo?

On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying a
>> function that would result in an exception,  this can be extracted by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless there
> is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes to
> simpler to handle deadletter output? I think there was a thread about it in
> the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
>> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>>> can extract your Success Records (TupleTag) and your DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize explicitly
 in a ParDo, which gives complete control on how to handle record errors.
 This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be convenient
 for users in many such scenarios. This is simpler than each source
 supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably not
> something that can easily be supported. We might be able to support 
> similar
> features when we have Kafka on top of Splittable DoFn though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
> wrote:
>
>> This is a great question. I've added the dev list to be sure it gets
>> noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>> tobias.kay...@ricardo.ch> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that uses
>>> a KafkaIO
>>> connector for it's input? As
>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>> only a SerializableFunction and not a ParDo, how would I be able to
>>> produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO
>>> input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>> "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>> "true"))
>>> .withTimestampPolicyFactory(
>>> TimestampPolicyFactory.withTimestampFn(
>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>> .withReadCommitted()
>>> .commitOffsetsInFinalize())
>>>
>>>
>>> and I like to get deadletter outputs when my timestamp extraction
>>> fails.
>>>
>>> Best,
>>> Tobi
>>>
>>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

KafkaIO does return bytes, and I think most sources should, unless there is
a good reason not to.
Given that, do we think Beam should provide a tranform that makes to
simpler to handle deadletter output? I think there was a thread about it in
the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>> can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>> in a ParDo, which gives complete control on how to handle record errors.
>>> This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be convenient
>>> for users in many such scenarios. This is simpler than each source
>>> supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
>>> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is probably not
 something that can easily be supported. We might be able to support similar
 features when we have Kafka on top of Splittable DoFn though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:

> This is a great question. I've added the dev list to be sure it gets
> noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline that uses a
>> KafkaIO
>> connector for it's input? As
>> `TimestampPolicyFactory.withTimestampFn()` takes
>> only a SerializableFunction and not a ParDo, how would I be able to
>> produce a
>> Deadletter output from it?
>>
>> I have the following pipeline defined that reads from a KafkaIO input:
>>
>> pipeline.apply(
>>   KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>> "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "beam-consumers"))
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>> "true"))
>> .withTimestampPolicyFactory(
>> TimestampPolicyFactory.withTimestampFn(
>> new MessageTimestampExtractor(inputMessagesConfig)))
>> .withReadCommitted()
>> .commitOffsetsInFinalize())
>>
>>
>> and I like to get deadletter outputs when my timestamp extraction
>> fails.
>>
>> Best,
>> Tobi
>>
>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Chamikara Jayalath
Ah nice. Yeah, if user can return full bytes instead of applying a function
that would result in an exception,  this can be extracted by a ParDo down
the line.

On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert that you
> can extract your Success Records (TupleTag) and your DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize explicitly in
>> a ParDo, which gives complete control on how to handle record errors. This
>> is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be convenient
>> for users in many such scenarios. This is simpler than each source
>> supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
>> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>> something that can easily be supported. We might be able to support similar
>>> features when we have Kafka on top of Splittable DoFn though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>>
 This is a great question. I've added the dev list to be sure it gets
 noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that uses a
> KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be able to
> produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a KafkaIO input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ConfigurableDeserializer.class)
> .updateConsumerProperties(
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> "earliest"))
> .updateConsumerProperties(ImmutableMap.of("group.id",
> "beam-consumers"))
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> "true"))
> .withTimestampPolicyFactory(
> TimestampPolicyFactory.withTimestampFn(
> new MessageTimestampExtractor(inputMessagesConfig)))
> .withReadCommitted()
> .commitOffsetsInFinalize())
>
>
> and I like to get deadletter outputs when my timestamp extraction
> fails.
>
> Best,
> Tobi
>
>


Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Maximilian Michels
The FlinkRunner uses a hash function (MurmurHash) on each key which 
places keys somewhere in the hash space. The hash space (2^32) is split 
among the partitions (5 in your case). Given enough keys, the chance 
increases they are equally spread.


This should be similar to what the other Runners do.

On 24.10.18 10:58, Jozef Vilcek wrote:


So if I run 5 workers with 50 shards, I end up with:

DurationBytes receivedRecords received
  2m 39s        900 MB            465,525
  2m 39s       1.76 GB            930,720
  2m 39s        789 MB            407,315
  2m 39s       1.32 GB            698,262
  2m 39s        788 MB            407,310

Still not good but better than with 5 shards where some workers did not 
participate at all.

So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax > wrote:


withNumShards(5) generates 5 random shards. It turns out that
statistically when you generate 5 random shards and you have 5
works, the probability is reasonably high that some workers will get
more than one shard (and as a result not all workers will
participate). Are you able to set the number of shards larger than 5?

On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek mailto:jozo.vil...@gmail.com>> wrote:

cc (dev)

I tried to run the example with FlinkRunner in batch mode and
received again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above
example, pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten
had incompatible triggers:

AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(1)),

Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour,

AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),

Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
mailto:jozo.vil...@gmail.com>> wrote:

Hi Max,

I forgot to mention that example is run in streaming mode,
therefore I can not do writes without specifying shards.
FileIO explicitly asks for them.

I am not sure where the problem is. FlinkRunner is only one
I used.

On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

Hi Jozef,

This does not look like a FlinkRunner related problem,
but is caused by
the `WriteFiles` sharding logic. It assigns keys and
does a Reshuffle
which apparently does not lead to good data spread in
your case.

Do you see the same behavior without `withNumShards(5)`?

Thanks,
Max

On 22.10.18 11:57, Jozef Vilcek wrote:
 > Hello,
 >
 > I am having some trouble to get a balanced write via
FileIO. Workers at
 > the shuffle side where data per window fire are
written to the
 > filesystem receive unbalanced number of events.
 >
 > Here is a naive code example:
 >
 >      val read = KafkaIO.read()
 >          .withTopic("topic")
 >          .withBootstrapServers("kafka1:9092")
 > 
.withKeyDeserializer(classOf[ByteArrayDeserializer])
 > 
.withValueDeserializer(classOf[ByteArrayDeserializer])

 >          .withProcessingTime()
 >
 >      pipeline
 >          .apply(read)
 >          .apply(MapElements.via(new
 > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
String]() {
 >            override def apply(input:
KafkaRecord[Array[Byte],
 > Array[Byte]]): String = {
 >              new String(input.getKV.getValue, "UTF-8")
 >            }
 >          }))
 >
 >
 >

.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
 >              .triggering(AfterWatermark.pastEndOfWindow()
 > 
.withEarlyFirings(AfterPane.elementCountAtLeast(4))
 >

Re: write to a kafka topic that is set in data

2018-10-24 Thread Alexey Romanenko
I added a simple support of this for usual type of Kafka sink (PR: 
https://github.com/apache/beam/pull/6776 
 , welcomed for review, btw :) )

In the same time, there is another, more complicated, type of sink - EOS 
(Exactly Once Sink). In this case the data is partitioned among fixed number of 
shards and it creates one ShardWriter per shard. In its order, ShardWriter 
depends on Kafka topic. So, seems that in case of multiple and dynamic sink 
topics, we need to create new ShardWriter for every new topic per shard, 

Is my assumption correct or I missed/misunderstood something? 

> On 20 Oct 2018, at 01:21, Lukasz Cwik  wrote:
> 
> Thanks Raghu, added starter and newbie labels to the issue.
> 
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi  > wrote:
> It will be a good starter feature for someone interested in Beam & Kafka. 
> Writer is very simple in Beam. It is little more than a ParDo. 
> 
> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev  > wrote:
> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks 
> for the suggestion, unfortunately, I don't have a fixed number of topics. 
> Still, we'll probably use your approach for a limited number of topics until 
> the functionality is added, thank you!
> 
> Thanks,
> Dmitry
> 
> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik  > wrote:
> If there are a fixed number of topics, you could partition your write by 
> structuring your pipeline as such:
> ParDo(PartitionByTopic) > KafkaIO.write(topicA)
> \---> KafkaIO.write(topicB)
> \---> KafkaIO.write(...)
> 
> There is no support currently for writing to Kafka dynamically based upon a 
> destination that is part of the data.
> I filed https://issues.apache.org/jira/browse/BEAM-5798 
>  for the issue.
> 
> On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com  
> mailto:mina...@gmail.com>> wrote:
> Hi guys!!
> 
> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But 
> I need to be able to get topic name dynamically based on the data received. 
> For example, I would like to send data for one tenant to topic "data_feed_1" 
> and for another tenant to "topic data_feed_999".
> I'm coming from Flink where it's possible via 
> KeyedSerializationSchema.getTargetTopic().
> Is there anything similar in KafkaIO?
> 
> Thanks,
> Dmitry
> -- 
> --
> Dmitry
> 



Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s 900 MB 465,525
 2m 39s1.76 GB 930,720
 2m 39s 789 MB 407,315
 2m 39s1.32 GB 698,262
 2m 39s 788 MB 407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax  wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek 
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(1)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour,
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek 
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
>>> wrote:
>>>
 Hi Jozef,

 This does not look like a FlinkRunner related problem, but is caused by
 the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
 which apparently does not lead to good data spread in your case.

 Do you see the same behavior without `withNumShards(5)`?

 Thanks,
 Max

 On 22.10.18 11:57, Jozef Vilcek wrote:
 > Hello,
 >
 > I am having some trouble to get a balanced write via FileIO. Workers
 at
 > the shuffle side where data per window fire are written to the
 > filesystem receive unbalanced number of events.
 >
 > Here is a naive code example:
 >
 >  val read = KafkaIO.read()
 >  .withTopic("topic")
 >  .withBootstrapServers("kafka1:9092")
 >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
 >  .withValueDeserializer(classOf[ByteArrayDeserializer])
 >  .withProcessingTime()
 >
 >  pipeline
 >  .apply(read)
 >  .apply(MapElements.via(new
 > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
 >override def apply(input: KafkaRecord[Array[Byte],
 > Array[Byte]]): String = {
 >  new String(input.getKV.getValue, "UTF-8")
 >}
 >  }))
 >
 >
 > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
 >  .triggering(AfterWatermark.pastEndOfWindow()
 >
 .withEarlyFirings(AfterPane.elementCountAtLeast(4))
 >
 .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
 >
 Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
 >
 >
 Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
 >  .discardingFiredPanes()
 >  .withAllowedLateness(Duration.standardDays(7)))
 >
 >  .apply(FileIO.write()
 >  .via(TextIO.sink())
 >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
 >  .withTempDirectory(tempLocation)
 >  .withNumShards(5))
 >
 >
 > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
 > number of shards), I would expect that each worker will participate
 on
 > persisting shards and equally, since code uses fixed number of shards
 > (and random shard assign?). But reality is different (see 2
 attachements
 > - statistiscs from flink task reading from kafka and task writing to
 files)
 >
 > What am I missing? How to achieve balanced writes?
 >
 > Thanks,
 > Jozef
 >
 >

>>>


Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
cc (dev)

I tried to run the example with FlinkRunner in batch mode and received
again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above example,
pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten had
incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(1)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour,
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek  wrote:

> Hi Max,
>
> I forgot to mention that example is run in streaming mode, therefore I can
> not do writes without specifying shards. FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is only one I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
> wrote:
>
>> Hi Jozef,
>>
>> This does not look like a FlinkRunner related problem, but is caused by
>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>> which apparently does not lead to good data spread in your case.
>>
>> Do you see the same behavior without `withNumShards(5)`?
>>
>> Thanks,
>> Max
>>
>> On 22.10.18 11:57, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > I am having some trouble to get a balanced write via FileIO. Workers at
>> > the shuffle side where data per window fire are written to the
>> > filesystem receive unbalanced number of events.
>> >
>> > Here is a naive code example:
>> >
>> >  val read = KafkaIO.read()
>> >  .withTopic("topic")
>> >  .withBootstrapServers("kafka1:9092")
>> >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >  .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >  .withProcessingTime()
>> >
>> >  pipeline
>> >  .apply(read)
>> >  .apply(MapElements.via(new
>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>> >override def apply(input: KafkaRecord[Array[Byte],
>> > Array[Byte]]): String = {
>> >  new String(input.getKV.getValue, "UTF-8")
>> >}
>> >  }))
>> >
>> >
>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >  .triggering(AfterWatermark.pastEndOfWindow()
>> >  .withEarlyFirings(AfterPane.elementCountAtLeast(4))
>> >
>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
>> >
>> >
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
>> >  .discardingFiredPanes()
>> >  .withAllowedLateness(Duration.standardDays(7)))
>> >
>> >  .apply(FileIO.write()
>> >  .via(TextIO.sink())
>> >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
>> >  .withTempDirectory(tempLocation)
>> >  .withNumShards(5))
>> >
>> >
>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>> > number of shards), I would expect that each worker will participate on
>> > persisting shards and equally, since code uses fixed number of shards
>> > (and random shard assign?). But reality is different (see 2
>> attachements
>> > - statistiscs from flink task reading from kafka and task writing to
>> files)
>> >
>> > What am I missing? How to achieve balanced writes?
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Juan Carlos Garcia
As Raghu said,

Just apply a regular ParDo and return a PCollectionTuple afert that you can
extract your Success Records (TupleTag) and your DeadLetter
records(TupleTag) and do whatever you want with them.


Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly in
> a ParDo, which gives complete control on how to handle record errors. This
> is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient for
> users in many such scenarios. This is simpler than each source supporting
> it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>> something that can easily be supported. We might be able to support similar
>> features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias 
>>> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that uses a
 KafkaIO
 connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
 takes
 only a SerializableFunction and not a ParDo, how would I be able to
 produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(
 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))
 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
 "earliest"))
 .updateConsumerProperties(ImmutableMap.of("group.id",
 "beam-consumers"))
 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
 "true"))
 .withTimestampPolicyFactory(
 TimestampPolicyFactory.withTimestampFn(
 new MessageTimestampExtractor(inputMessagesConfig)))
 .withReadCommitted()
 .commitOffsetsInFinalize())


 and I like to get deadletter outputs when my timestamp extraction fails.

 Best,
 Tobi