Re: [ANNOUNCE] New PMC Member: Alexey Romanenko

2020-06-16 Thread rahul patwari
Congrats Alexey!

On Tue, Jun 16, 2020 at 9:27 PM Ismaël Mejía  wrote:

> Please join me and the rest of Beam PMC in welcoming Alexey Romanenko as
> our
> newest PMC member.
>
> Alexey has significantly contributed to the project in different ways: new
> features and improvements in the Spark runner(s) as well as maintenance of
> multiple IO connectors including some of our most used ones (Kafka and
> Kinesis/Aws). Alexey is also quite active helping new contributors and our
> user
> community in the mailing lists / slack and Stack overflow.
>
> Congratulations Alexey!  And thanks for being a part of Beam!
>
> Ismaël
>


Re: [VOTE] Release 2.21.0, release candidate #1

2020-05-19 Thread rahul patwari
Hi Luke,

The release is not severely broken without PR #11609.
The PR ensures that, while building a Row with Logical Type, the input
value provided is proper. If we take FixedBytes logical type with length
10, for example, the proper input value will be a byte array of length 10.
But, without this PR, for FixedBytes logical type, the Row will be built
with input value with length less than the expected length.
But, as long as the input value provided is correct, there shouldn't be any
problems.
I will change the fix version as 2.22.0 for BEAM-9887
<https://issues.apache.org/jira/browse/BEAM-9887>.

Regards,
Rahul

On Wed, May 20, 2020 at 8:51 AM Luke Cwik  wrote:

> Rahul, do you believe that the release is severely broken without PR/11609
> enough to require another release candidate or would waiting till 2.22
> (which is due to be cut tomorrow)?
>
> On Tue, May 19, 2020 at 8:13 PM rahul patwari 
> wrote:
>
>> Hi,
>>
>> Can the PR: https://github.com/apache/beam/pull/11609 be cherry-picked
>> for 2.21.0 release?
>> If not, the fix version has to be changed for BEAM-9887
>> <https://issues.apache.org/jira/browse/BEAM-9887>.
>>
>> Regards,
>> Rahul
>>
>> On Wed, May 20, 2020 at 6:05 AM Ahmet Altay  wrote:
>>
>>> +1, I validated python 2 and 3 quickstarts.
>>>
>>> On Tue, May 19, 2020 at 4:57 PM Hannah Jiang 
>>> wrote:
>>>
>>>> I confirmed that licenses/notices/source code are added to Java and
>>>> Python docker images as expected.
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 2:36 PM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> Thanks for bringing that up Steve. I'll leave it to others to vote on
>>>>> whether that necessitates an RC #2.
>>>>>
>>>>> On Tue, May 19, 2020 at 5:22 PM Steve Niemitz 
>>>>> wrote:
>>>>>
>>>>>> https://issues.apache.org/jira/browse/BEAM-10015 was marked as 2.21
>>>>>> but isn't in the RC1 tag.  It's marked as P1, and seems like the
>>>>>> implication is that without the fix, pipelines can produce incorrect 
>>>>>> data.
>>>>>> Is this a blocker?
>>>>>>
>>>>>
>>> +Reuven Lax , would this be a release blocker?
>>>
>>>
>>>>
>>>>>> On Tue, May 19, 2020 at 4:51 PM Kyle Weaver 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>> Please review and vote on the release candidate #1 for the version
>>>>>>> 2.21.0, as follows:
>>>>>>> [ ] +1, Approve the release
>>>>>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>>>>>
>>>>>>>
>>>>>>> The complete staging area is available for your review, which
>>>>>>> includes:
>>>>>>> * JIRA release notes [1],
>>>>>>> * the official Apache source release to be deployed to
>>>>>>> dist.apache.org [2], which is signed with the key with fingerprint
>>>>>>> F11E37D7F006D086232876797B6D6673C79AEA72 [3],
>>>>>>> * all artifacts to be deployed to the Maven Central Repository [4],
>>>>>>> * source code tag "v2.21.0-RC1" [5],
>>>>>>> * website pull request listing the release [6], publishing the API
>>>>>>> reference manual [7], and the blog post [8].
>>>>>>> * Java artifacts were built with Maven 3.6.3 and OpenJDK/Oracle JDK
>>>>>>> 1.8.0.
>>>>>>> * Python artifacts are deployed along with the source release to the
>>>>>>> dist.apache.org [2].
>>>>>>> * Validation sheet with a tab for 2.21.0 release to help with
>>>>>>> validation [9].
>>>>>>> * Docker images published to Docker Hub [10].
>>>>>>>
>>>>>>> The vote will be open for at least 72 hours. It is adopted by
>>>>>>> majority approval, with at least 3 PMC affirmative votes.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Kyle
>>>>>>>
>>>>>>> [1]
>>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12347143
>>>>>>> [2] https://dist.apache.org/repos/dist/dev/beam/2.21.0/
>>>>>>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>>>>>>> [4]
>>>>>>> https://repository.apache.org/content/repositories/orgapachebeam-1103/
>>>>>>> [5] https://github.com/apache/beam/releases/tag/v2.21.0-RC1
>>>>>>> [6] https://github.com/apache/beam/pull/11727
>>>>>>> [7] https://github.com/apache/beam-site/pull/603
>>>>>>> [8] https://github.com/apache/beam/pull/11729
>>>>>>> [9]
>>>>>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=275707202
>>>>>>> [10] https://hub.docker.com/search?q=apache%2Fbeam=image
>>>>>>>
>>>>>>


Re: [VOTE] Release 2.21.0, release candidate #1

2020-05-19 Thread rahul patwari
Hi,

Can the PR: https://github.com/apache/beam/pull/11609 be cherry-picked for
2.21.0 release?
If not, the fix version has to be changed for BEAM-9887
.

Regards,
Rahul

On Wed, May 20, 2020 at 6:05 AM Ahmet Altay  wrote:

> +1, I validated python 2 and 3 quickstarts.
>
> On Tue, May 19, 2020 at 4:57 PM Hannah Jiang 
> wrote:
>
>> I confirmed that licenses/notices/source code are added to Java and
>> Python docker images as expected.
>>
>>
>> On Tue, May 19, 2020 at 2:36 PM Kyle Weaver  wrote:
>>
>>> Thanks for bringing that up Steve. I'll leave it to others to vote on
>>> whether that necessitates an RC #2.
>>>
>>> On Tue, May 19, 2020 at 5:22 PM Steve Niemitz 
>>> wrote:
>>>
 https://issues.apache.org/jira/browse/BEAM-10015 was marked as 2.21
 but isn't in the RC1 tag.  It's marked as P1, and seems like the
 implication is that without the fix, pipelines can produce incorrect data.
 Is this a blocker?

>>>
> +Reuven Lax , would this be a release blocker?
>
>
>>
 On Tue, May 19, 2020 at 4:51 PM Kyle Weaver 
 wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version
> 2.21.0, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which is signed with the key with fingerprint
> F11E37D7F006D086232876797B6D6673C79AEA72 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.21.0-RC1" [5],
> * website pull request listing the release [6], publishing the API
> reference manual [7], and the blog post [8].
> * Java artifacts were built with Maven 3.6.3 and OpenJDK/Oracle JDK
> 1.8.0.
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2].
> * Validation sheet with a tab for 2.21.0 release to help with
> validation [9].
> * Docker images published to Docker Hub [10].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Kyle
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12347143
> [2] https://dist.apache.org/repos/dist/dev/beam/2.21.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4]
> https://repository.apache.org/content/repositories/orgapachebeam-1103/
> [5] https://github.com/apache/beam/releases/tag/v2.21.0-RC1
> [6] https://github.com/apache/beam/pull/11727
> [7] https://github.com/apache/beam-site/pull/603
> [8] https://github.com/apache/beam/pull/11729
> [9]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=275707202
> [10] https://hub.docker.com/search?q=apache%2Fbeam=image
>



Re: Parallelism in Combine.GroupedValues

2020-05-12 Thread rahul patwari
Hi Luke,

I should have been more clear with my question. Sorry, my bad.

I wanted to ask: How can combine happen parallelly by using only *one
accumulator instance*?

It has been explicitly specified in CombineFn.apply()[4] that
mergeAccumulators() will not be called. A single accumulator instance is
created and used to combine all the values associated with a single key.
As Combine.GroupedValues use CombineFn.apply(), the parallelism for
combining the values of a key will only be one.
And as Combine.perKey() uses Combine.GroupedValues to combine the values of
a key, the parallelism of this combine will also be limited to one.

Do you mean to say that these types of Combine will be translated to Lifted
Combine(assuming no side inputs are provided) depending on the runner?

Regards,
Rahul

[4]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L441

On Tue, May 12, 2020 at 8:35 PM Luke Cwik  wrote:

> There is more than one instance of an accumulator being used and then
> those accumulators are merged using mergeAccumulators method.
>
> Two examples of when combining happens in parallel is when the withFewKeys
> hint is used on the combiner or when there is partial combining[1]
> happening on the mapper side before the grouping operation.
>
> 1: https://s.apache.org/beam-runner-api-combine-model
>
> On Tue, May 12, 2020 at 7:05 AM rahul patwari 
> wrote:
>
>> Hi,
>>
>> In the Javadoc for Combine.GroupedValues[1], it has been described that 
>> *combining
>> the values associated with a single key can happen in parallel*.
>> The logic to combine values associated with a key can be provided by
>> CombineFnWithContext (or) CombineFn.
>> Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a
>> single accumulator to combine the values.
>>
>> My understanding is that the parallelism in Combine PTransform will be
>> determined by the no. of accumulators. But, the Javadoc describes that
>> combining is done in parallel even though the no. of accumulators used to
>> combine is one.
>>
>> How can combine happen parallelly by using only one accumulator?
>>
>> Regards,
>> Rahul
>>
>> [1]:
>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
>> [2]:
>> https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
>> [3]:
>> https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443
>>
>


Parallelism in Combine.GroupedValues

2020-05-12 Thread rahul patwari
Hi,

In the Javadoc for Combine.GroupedValues[1], it has been described
that *combining
the values associated with a single key can happen in parallel*.
The logic to combine values associated with a key can be provided by
CombineFnWithContext (or) CombineFn.
Both CombineFnWithContext.apply()[2] and CombineFn.apply()[3] uses a single
accumulator to combine the values.

My understanding is that the parallelism in Combine PTransform will be
determined by the no. of accumulators. But, the Javadoc describes that
combining is done in parallel even though the no. of accumulators used to
combine is one.

How can combine happen parallelly by using only one accumulator?

Regards,
Rahul

[1]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L2075-L2078
[2]:
https://github.com/apache/beam/blob/53e5cee254023152e77a3fc46564642dc9b6b506/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java#L117
[3]:
https://github.com/apache/beam/blob/334682d4a8ac5e1ebd298ba3b8020a9161884927/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L443


Re: Jenkins jobs not running for my PR 10438

2020-05-11 Thread rahul patwari
Hi,

Can you please trigger pre-commit checks for
https://github.com/apache/beam/pull/11581

Thanks,
Rahul

On Tue, May 12, 2020 at 7:12 AM Ahmet Altay  wrote:

> Done for both Yoshiki and Tomo's PRs.
>
> On Mon, May 11, 2020 at 6:33 PM Tomo Suzuki  wrote:
>
>> Hi Beam committers,
>>
>> Would you run the basic precommit checks for
>> https://github.com/apache/beam/pull/11674 ?
>>
>> Regards,
>> Tomo
>>
>


Re: NPE in Calcite dialect when input PCollection has logical type in schema, from JdbcIO Transform

2020-05-01 Thread rahul patwari
Thanks for your suggestion, Brian.

I will move the logical types explicitly defined for JdbcIO in
org.apache.beam.sdk.io.jdbc.LogicalTypes  to
org.apache.beam.sdk.schemas.logicaltypes
with a URN identifier.
If any other IO defines logical types which correspond to SQL data types,
all those logical types can be moved to
org.apache.beam.sdk.schemas.logicaltypes
or use the logical types which are already defined in this package, so that
these IOs can be used with BeamSql.

On Sat, May 2, 2020 at 3:00 AM Brian Hulette  wrote:

>
>
> On Thu, Apr 30, 2020 at 11:26 PM rahul patwari 
> wrote:
>
>> Hi,
>>
>> A JIRA ticket is raised to track this bug: BEAM-8307
>> <https://issues.apache.org/jira/browse/BEAM-8307>
>>
>> I have raised a PR: https://github.com/apache/beam/pull/11581 to fix the
>> issue.
>>
>> This PR takes care of using BeamSql with JdbcIO.
>> I would be interested to contribute if any other IOs supported by Beam
>> requires a similar fix like the one in this PR so that they can be used
>> with BeamSql.
>>
>> What could be a cleaner approach, in general, to handle this for all the
>> IOs?
>>
>> Also, what can be done to support BeamSql with User-Defined Logical
>> Types?
>> Should they be converted to one of the Beam SQL Types[1] before applying
>> SqlTransform.query()?
>> Should we expose an interface to provide Calcite RelDataType Mapping for
>> User-Defined Logical Types?
>>
>
> First I want to say I don't think the JdbcIO types you're adding fixes for
> in https://github.com/apache/beam/pull/11581 should be considered
> "user-defined logical types". These are all well known types that we
> *should* have standard logical types for in schemas.logicaltypes, with a
> URN identifier. Then both JDBC and SQL can reference those same logical
> types and there's no need to add a mapping in CalciteUtils. This is what 
> +Robin
> Qiu  is doing for DATE in [1].
>
> As far as user-defined logical types in general, personally I don't think
> SqlTransform should accept them. I brought this up at the end of a recent
> message [2]. I'll copy the relevant part here so you don't have to wade
> through it :)
>
> > ... I'm against the idea of
> > implicitly stripping away logical types like this in schema-aware
> > transforms. A logical type indicates that the value has some additional
> > meaning beyond just the base type (and maybe a restricted range), and I
> > don't think transforms should be able to ignore that meaning unless the
> > user explicitly allows it, or first converts to the base type themselves.
>
> If users really want to process custom logical types with SQL, they could
> just convert them to their base types first. If this is common, we could
> add an option to SqlTransform to have it do this automatically for
> unrecognized types.
>
> [1] https://github.com/apache/beam/pull/11272
> [2]
> https://lists.apache.org/thread.html/r2e05355b74fb5b8149af78ade1e3539ec08371a9a4b2b9e45737e6be%40%3Cdev.beam.apache.org%3E
>
>
>
>> Let me know your thoughts.
>>
>> [1]:
>> https://github.com/apache/beam/blob/b8aa8486f336df6fc9cf581f29040194edad3b87/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java#L43
>>
>> Regards,
>> Rahul
>>
>


NPE in Calcite dialect when input PCollection has logical type in schema, from JdbcIO Transform

2020-05-01 Thread rahul patwari
Hi,

A JIRA ticket is raised to track this bug: BEAM-8307


I have raised a PR: https://github.com/apache/beam/pull/11581 to fix the
issue.

This PR takes care of using BeamSql with JdbcIO.
I would be interested to contribute if any other IOs supported by Beam
requires a similar fix like the one in this PR so that they can be used
with BeamSql.

What could be a cleaner approach, in general, to handle this for all the
IOs?

Also, what can be done to support BeamSql with User-Defined Logical Types?
Should they be converted to one of the Beam SQL Types[1] before applying
SqlTransform.query()?
Should we expose an interface to provide Calcite RelDataType Mapping for
User-Defined Logical Types?

Let me know your thoughts.

[1]:
https://github.com/apache/beam/blob/b8aa8486f336df6fc9cf581f29040194edad3b87/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java#L43

Regards,
Rahul


Re: Jenkins jobs not running for my PR 10438

2020-04-30 Thread rahul patwari
Hi Committers,

Can you please trigger tests for  https://github.com/apache/beam/pull/11569
and https://github.com/apache/beam/pull/11581

Thanks,
Rahul

On Tue, 28 Apr 2020, 10:58 pm Alexey Romanenko, 
wrote:

> Thanks Udi! I'll track for updates on this.
>
> On 28 Apr 2020, at 19:16, Udi Meiri  wrote:
>
> Alexey, what you're doing should be working (commits should trigger tests,
> as should "retest this please" and other phrases).
>
> https://issues.apache.org/jira/browse/INFRA-19836 tracks this issue
>
> On Tue, Apr 28, 2020 at 10:04 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Does anyone know the “golden rule” how to trigger Jenkins tests?
>>
>> For example:
>> https://github.com/apache/beam/pull/11341
>> I tried several times and it’s still not triggered.
>>
>> On 28 Apr 2020, at 13:33, Ismaël Mejía  wrote:
>>
>> done
>>
>> On Tue, Apr 28, 2020 at 12:47 PM Shoaib Zafar <
>> shoaib.za...@venturedive.com> wrote:
>>
>>> Hello Beam Committers,
>>>
>>> I would appreciate if you could trigger precommit checks for the PR:
>>> https://github.com/apache/beam/pull/11210 along with the python
>>> post-commit check (Run Python 3.5 PostCommit).
>>>
>>> Thanks and Regards.
>>>
>>> *Shoaib Zafar*
>>> Software Engineering Lead
>>> Mobile: +92 333 274 6242
>>> Skype: live:shoaibzafar_1
>>>
>>> 
>>>
>>>
>>> On Wed, Apr 22, 2020 at 9:40 PM Rehman Murad Ali <
>>> rehman.murad...@venturedive.com> wrote:
>>>
 Hello Beam Committers.

 Would you please trigger basic tests as well as all *validatesRunner*
 test on this PR:
 https://github.com/apache/beam/pull/11154 
 


 *Thanks & Regards*



 *Rehman Murad Ali*
 Software Engineer
 Mobile: +92 3452076766 <+92%20345%202076766>
 Skype: rehman.muradali


 On Wed, Apr 22, 2020 at 9:25 PM Yoshiki Obata 
 wrote:

> Hello Beam Committers,
>
> I would appreciate if you could trigger precommit checks for these PRs;
> https://github.com/apache/beam/pull/11493
> https://github.com/apache/beam/pull/11494
>
> Regards
> yoshiki
>
> 2020年4月21日(火) 1:11 Luke Cwik :
>
>> The precommits started and I provided the comments for the
>> postcommits as you have requested but they have yet to start.
>>
>> On Mon, Apr 20, 2020 at 8:31 AM Shoaib Zafar <
>> shoaib.za...@venturedive.com> wrote:
>>
>>> Hello Beam Committers.
>>>
>>> Would you please trigger the pre-commit checks on the PR:
>>> https://github.com/apache/beam/pull/11210 along with the python
>>> post-commit checks (Run Python PostCommit, Run Python 3.5 PostCommit)?
>>>
>>> Thanks! Regards,
>>>
>>> *Shoaib Zafar*
>>> Software Engineering Lead
>>> Mobile: +92 333 274 6242
>>> Skype: live:shoaibzafar_1
>>>
>>> 
>>>
>>>
>>> On Fri, Apr 17, 2020 at 1:19 PM Ismaël Mejía 
>>> wrote:
>>>
 done

 On Thu, Apr 16, 2020 at 4:32 PM Rehman Murad Ali <
 rehman.murad...@venturedive.com> wrote:

> Hello Beam Committers.
>
> Would you please trigger basic tests as well as validatesRunner
> test on this PR:
>
> 
> https://github.com/apache/beam/pull/11350
>
>
> *Thanks & Regards*
>
>
>
> *Rehman Murad Ali*
> Software Engineer
> Mobile: +92 3452076766 <+92%20345%202076766>
> Skype: rehman.muradali
>
>
> On Mon, Apr 13, 2020 at 10:16 PM Ahmet Altay 
> wrote:
>
>> Done.
>>
>> On Mon, Apr 13, 2020 at 8:52 AM Shoaib Zafar <
>> shoaib.za...@venturedive.com> wrote:
>>
>>> Hello Beam Committers.
>>>
>>> Would you please trigger the pre-commit checks on the PR:
>>> https://github.com/apache/beam/pull/11210 along with the python
>>> post-commit checks (Run Python PostCommit, Run Python 3.5 
>>> PostCommit)?
>>>
>>> Thanks!
>>>
>>> *Shoaib Zafar*
>>> Software Engineering Lead
>>> Mobile: +92 333 274 6242
>>> Skype: live:shoaibzafar_1
>>>
>>> 
>>>
>>>
>>> On Mon, Apr 13, 2020 at 4:00 PM Ismaël Mejía 
>>> wrote:
>>>
 done

 On Mon, Apr 13, 2020 at 12:42 PM Rehman Murad Ali
  wrote:
 >
 > Hi Beam Committers!
 >
 > Thanks( Ismael )
 >
 > I appreciate if someone could trigger these tests on this PR
 https://github.com/apache/beam/pull/11154
 >
 > run dataflow validatesrunner
 > run flink 

Latency in advancing Spark Watermarks

2020-03-19 Thread rahul patwari
Hi,

*Usage Info*:
We are using Beam: 2.16.0, Spark: 2.4.2
We are running Spark on Kubernetes.
We are using Spark Streaming(legacy) Runner with Beam Java SDK
The Pipeline has been run with default configurations i.e. default
configurations for SparkPipelineOptions.

*Issue*:
When a Beam Pipeline is submitted to Spark, there is latency in getting the
results of an Aggregation. This latency is increasing with every new
Window. The Pipeline is run with 1-minute fixed windows and default
trigger(When Watermark crosses the end of window). The same pipeline is
working as expected in Flink and Direct runners.

*Initial Analysis*:
The Watermarks for a batch(500ms[default]) are broadcasted when
onBatchCompleted()[1] event for the batch is triggered. We are
observing latency between the time when the batch is completed and the time
when onBatchCompleted() event is triggered for the batch.

For Example, if the batch is completed at 09:29:30(HH:MM:SS), the
onbatchCompleted() event for the batch is triggered at 09:29:35. As you can
see, there is a 5-second delay in this example. We came to know about the
time when the batch is completed from Spark Driver UI and Driver logs.
I have asked a question about this latency in Spark User mailing list[2] and
waiting for a response.

The Watermarks for each batch are added to a queue. When onBatchCompleted()
event is triggered for any batch, the watermarks are polled from the queue
and broadcasted[3]. As there is a delay between the batch Completion and
onBatchCompleted() event getting triggered for the batch, there is a delay
in advancing the Watermarks, because of which there is a latency in
emitting results after the aggregation. As the Pipeline progresses, because
of this latency, the rate at which watermarks are added to the queue is
high compared to the rate at which the Watermarks are polled from the queue
and broadcasted. And as the Pipeline progresses, the latency between Batch
Completion and onBatchCompleted() event getting triggered is increasing.

*Logs*:
These are the trimmed logs which show the issue in action:

*11:37:06*: INFO: scheduler.JobScheduler: "Finished job streaming job
*1584099237500* ms.4 from job set of time 1584099237500 ms"
*11:37:06*: INFO: scheduler.JobScheduler: "Total delay: 189.352 s for time
*1584099237500* ms (execution: 0.942 s)"
*11:40:30*: INFO: util.GlobalWatermarkHolder: "Put new watermark block:
{0=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.621Z,
highWatermark=2020-03-13T11:37:04.882Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z},
1=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.872Z,
highWatermark=2020-03-13T11:37:05.107Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z},
2=SparkWatermarks{lowWatermark=2020-03-13T11:37:04.204Z,
highWatermark=2020-03-13T11:37:05.377Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z}}"
*11:40:30*: INFO:
util.GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: "Batch with
timestamp: *1584099237500* has completed, watermarks have been updated."

As you can see, there is almost *3 minutes 24 seconds delay* between the
time when the batch is completed and the time when onBatchCompleted() event
for the batch is triggered(Watermarks are advanced).



Did anyone face this issue before?
What are the factors that can contribute to this latency?
Thanks for any pointers to debug the issue.

[1] -
https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L363
[2] -
http://apache-spark-user-list.1001560.n3.nabble.com/Latency-between-Batch-Completion-and-triggering-of-onBatchCompleted-event-tc37086.html
[3] -
https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L208

Regards,
Rahul


Re: Issue with KafkaIO for list of topics

2020-02-28 Thread rahul patwari
Hi Maulik,

Currently, I don't think it is possible to filter topics based on whether
data is being produced to the topic (or) not.
But, the Watermark logic can be changed to make the Pipeline work.

Since the timestamps of the records are the time when the events are pushed
to Kafka, every record will have monotonically increasing timestamps except
for out of order events.
Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by
default, we can assign [current_timestamp - some_delay] as default and the
same can be done in getWatermark() method, in which case, even if the
partition is idle, Watermark will advance.

Make sure that the timestamp of the Watermark is monotonically increasing
and choose the delay carefully in order to avoid discarding out of order
events.

Refer
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
for an example.

Regards,
Rahul


On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji 
wrote:

> Hi Rahul,
>
> Thank you very much for the detailed explanation.
>
> Since we don't know which are the topics that have zero throughputs, is
> there a way in which we can filter out such topics in KafkaIO?
>
> Since KafkaIO doesn't support passing a regex to consume data from, I am
> getting a list of topics from kafka and passing it.
>
> Is there a way to filter out such topics? Also, it can happen that when
> the job has started the topic might have no data for a few windows and
> after that, it can get some data. This filter should be dynamic as well.
>
> Please share some ideas on how we can make this work.
>
> Community members, please share your thoughts as well on how we can
> achieve this.
>
> Thanks and regards,
> Maulik
>
> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari 
> wrote:
>
>> Hi Maulik,
>>
>> This seems like an issue with Watermark.
>> According to
>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
>> ,
>>
>> If there are multiple partitions (or) multiple topics, Watermark will be
>> calculated for each of the partition and the minimum watermark is
>> considered as the current Watermark.
>> Assuming that no message is pushed to the topic with 0 throughput,
>> according to your logic for the watermark calculation, the watermark of
>> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
>> (the smallest representable timestamp of an element -
>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
>> ).
>>
>> As the result will be emitted from GroupByKey when the Watermark crosses
>> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE,
>> you are not seeing the results from GroupByKey.
>>
>> Regards,
>> Rahul
>>
>> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji 
>> wrote:
>>
>>> *Observations:*
>>> If we read using KafkaIO for a list of topics where one of the topics
>>> has zero throughputs,
>>> and KafkaIO is followed by GroupByKey stage, then:
>>> a. No data is output from GroupByKey stage for all the topics and not
>>> just the zero throughput topic.
>>>
>>> If all topics have some throughput coming in, then it works fine and we
>>> get some output from GroupByKey stage.
>>>
>>> Is this an issue?
>>>
>>> *Points:*
>>> a. The output from GroupByKey is only when all topics have some
>>> throughput
>>> b. This is a problem with KafkaIO + GroupByKey, for case where I have
>>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
>>> even if there is no data for one of the files.
>>> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
>>> d. Even if lag is different for each topic on the list, we still get
>>> some output from GroupByKey.
>>>
>>>
>>> *Debugging:*While Debugging this issue I found that in split function
>>> of KafkaUnboundedSource we create KafkaUnboundedSource where partition list
>>> is one partition for each topic.
>>>
>>> I am not sure if this is some issue with watermark, since watermark for
>>> the topic with no throughput will not advance. But this looks like the most
>>> likely cause to me.
>>>
>>> *Please help me in figuring out whether this is an issue or if there is
>>> something wrong with my pipeline.*
>>>
>>> At

Re: Issue with KafkaIO for list of topics

2020-02-28 Thread rahul patwari
Hi Maulik,

This seems like an issue with Watermark.
According to
https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
,

If there are multiple partitions (or) multiple topics, Watermark will be
calculated for each of the partition and the minimum watermark is
considered as the current Watermark.
Assuming that no message is pushed to the topic with 0 throughput,
according to your logic for the watermark calculation, the watermark of
each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
(the smallest representable timestamp of an element -
https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
).

As the result will be emitted from GroupByKey when the Watermark crosses
the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you
are not seeing the results from GroupByKey.

Regards,
Rahul

On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji 
wrote:

> *Observations:*
> If we read using KafkaIO for a list of topics where one of the topics has
> zero throughputs,
> and KafkaIO is followed by GroupByKey stage, then:
> a. No data is output from GroupByKey stage for all the topics and not just
> the zero throughput topic.
>
> If all topics have some throughput coming in, then it works fine and we
> get some output from GroupByKey stage.
>
> Is this an issue?
>
> *Points:*
> a. The output from GroupByKey is only when all topics have some throughput
> b. This is a problem with KafkaIO + GroupByKey, for case where I have
> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
> even if there is no data for one of the files.
> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
> d. Even if lag is different for each topic on the list, we still get some
> output from GroupByKey.
>
>
> *Debugging:*While Debugging this issue I found that in split function of
> KafkaUnboundedSource we create KafkaUnboundedSource where partition list is
> one partition for each topic.
>
> I am not sure if this is some issue with watermark, since watermark for
> the topic with no throughput will not advance. But this looks like the most
> likely cause to me.
>
> *Please help me in figuring out whether this is an issue or if there is
> something wrong with my pipeline.*
>
> Attaching detailed pipeline information for more details:
>
> *Context:*
> I am currently using KafkaIO to read data from kafka for a list of topics
> with a custom timestamp policy.
>
> Below is how I am constructing KafkaIO reader:
>
> return KafkaIO.read()
> .withBootstrapServers(brokers)
> .withTopics(topics)
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withTimestampPolicyFactory((partition, previousWatermark) -> new 
> EventTimestampPolicy(godataService, previousWatermark))
> .commitOffsetsInFinalize();
>
> *Pipeline Information:
> *Pipeline Consists of six steps:
> a. Read From Kafka with custom timestamp policy
> b. Convert KafkaRecord to Message object
> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
> d. PCollection to PCollection> where Topic is 
> Keye. GroupByKey.create() to get PCollection>f. 
> PCollection> to PCollection for 
> each topicg. Write output to kafka
>
> *Detailed Pipeline Information*
> a. Read data from kafka to get KafkaRecord
> Here I am using my own timestamp policy which looks like below:
>
> public EventTimestampPolicy(MyService myService, Optional 
> previousWatermark) {
> this.myService = myService;
> this.currentWatermark = 
> previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
> }
>
> @Override
> public Instant getTimestampForRecord(PartitionContext context, 
> KafkaRecord record) {
> Instant eventTimestamp;
> try {
> eventTimestamp = Deserializer.getEventTimestamp(record, myService);
> } catch (InvalidProtocolBufferException e) {
> statsClient.increment("io.proto.buffer.exception");
> throw new RuntimeException(e);
> }
> this.currentWatermark = eventTimestamp;
> return this.currentWatermark;
> }
>
> @Override
> public Instant getWatermark(PartitionContext ctx) {
> return this.currentWatermark;
> }
>
> Event timestamp is one of the fields in the kafka message. It is the time
> when the event was pushed to kafka.
>
> b. DoFn to transform KafkaRecord to Message class.The Message 
> class contains properties like offset, topic, partition, offset and timestamp
>
> c. Windowing on 10 minute fixed window triggering at 
> AfterWatermark.pastEndOfWindow()
>
> d. PCollection to PCollection>
> Here Key is the kafka topic.
>
> e. GroupByKey to get PCollection>
>
> f. PCollection> to PCollection 
> for each topic
>
> g. Write output to kafka
>
>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread rahul patwari
Hi,

I would love to join the call.
Can you also share the meeting invitation with me?

Thanks,
Rahul

On Wed 18 Sep, 2019, 11:48 PM Xinyu Liu,  wrote:

> Alexey and Etienne: I'm very happy to join the sync-up meeting. Please
> forward the meeting info to me. I am based in California, US and hopefully
> the time will work :).
>
> Thanks,
> Xinyu
>
> On Wed, Sep 18, 2019 at 6:39 AM Etienne Chauchot 
> wrote:
>
>> Hi Xinyu,
>>
>> Thanks for offering help ! My comments are inline:
>>
>> Le vendredi 13 septembre 2019 à 12:16 -0700, Xinyu Liu a écrit :
>>
>> Hi, Etienne,
>>
>> The slides are very informative! Thanks for sharing the details about how
>> the Beam API are mapped into Spark Structural Streaming.
>>
>>
>> Thanks !
>>
>> We (LinkedIn) are also interested in trying the new SparkRunner to run
>> Beam pipeine in batch, and contribute to it too. From my understanding,
>> seems the functionality on batch side is mostly complete and covers quite a
>> large percentage of the tests (a few missing pieces like state and timer in
>> ParDo and SDF).
>>
>>
>> Correct, it passes 89% of the tests, but there is more than SDF, state
>> and timer missing, there is also ongoing encoders work that I would like to
>> commit/push before merging.
>>
>> If so, is it possible to merge the new runner sooner into master so it's
>> much easier for us to pull it in (we have an internal fork) and contribute
>> back?
>>
>>
>> Sure, see my other mail on this thread. As Alexey mentioned, please join
>> the sync meeting we have, the more the merrier !
>>
>>
>> Also curious about the scheme part in the runner. Seems we can leverage
>> the schema-aware work in PCollection and translate from Beam schema to
>> Spark, so it can be optimized in the planner layer. It will be great to
>> hear back your plans on that.
>>
>>
>> Well, it is not designed yet but, if you remember my talk, we need to
>> store beam windowing information with the data itself, so ending up having
>> a dataset . One lead that was discussed is to store it as a
>> Spark schema such as this:
>>
>> 1. field1: binary data for beam windowing information (cannot be mapped
>> to fields because beam windowing info is complex structure)
>>
>> 2. fields of data as defined in the Beam schema if there is one
>>
>>
>> Congrats on this great work!
>>
>> Thanks !
>>
>> Best,
>>
>> Etienne
>>
>> Thanks,
>> Xinyu
>>
>> On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:
>>
>> Hello Etienne,
>>
>> Your slide mentioned that streaming mode development is blocked because
>> Spark lacks supporting multiple-aggregations in its streaming mode but
>> design is ongoing. Do you have a link or something else to their design
>> discussion/doc?
>>
>>
>> -Rui
>>
>> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
>> wrote:
>>
>> Hi Rahul,
>> Sure, and great ! Thanks for proposing !
>> If you want details, here is the presentation I did 30 mins ago at the
>> apachecon. You will find the video on youtube shortly but in the meantime,
>> here is my presentation slides.
>>
>> And here is the structured streaming branch. I'll be happy to review your
>> PRs, thanks !
>>
>> <https://github.com/apache/beam/tree/spark-runner_structured-streaming>
>> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>>
>> Best
>> Etienne
>>
>> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>>
>> Hi Etienne,
>>
>> I came to know about the work going on in Structured Streaming Spark
>> Runner from Apache Beam Wiki - Works in Progress.
>> I have contributed to BeamSql earlier. And I am working on supporting
>> PCollectionView in BeamSql.
>>
>> I would love to understand the Runner's side of Apache Beam and
>> contribute to the Structured Streaming Spark Runner.
>>
>> Can you please point me in the right direction?
>>
>> Thanks,
>> Rahul
>>
>>


Beam Summit Videos in youtube

2019-09-18 Thread rahul patwari
Hi,

The videos of Beam Summit that has happened recently have disappeared from
YouTube Apache Beam channel.

Is uploading the videos a WIP?

Thanks,
Rahul


Re: Watermark is lagging in Spark Runner with kafkaIO

2019-09-10 Thread rahul patwari
Forgot to mention: A FixedWindow of duration 1 minute is applied before
applying SqlTransform.

On Tue, Sep 10, 2019 at 6:03 PM rahul patwari 
wrote:

> Hi,
> I am facing this issue too.
> +dev 
>
> Here is the Pipeline that we are using(providing a very simple pipeline to
> highlight the issue):
> KafkaSource -> SqlTransform -> KafkaSink
>
> We are reading from a single topic in KafkaSource with a single partition.
>
> Here is the data that we are producing to KafkaSource topic:
> "str1", "2019-09-10 11:36:42"
> "str2", "2019-09-10 11:36:44"
> "str3", "2019-09-10 11:36:45"
>
> The first column name is "strCol".
> The second column, i.e. the timestamp in string format is being used as
> the timestamp of the element.
> The timestamp is the wall time when the record got generated.
> After publishing this data to the Kafka topic, we are not publishing any
> more data. The topic is idle after that.
> The timestamps of the records are monotonically increasing.
>
> Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"
>
> Here is the result from KafkaSink:
> {"strCol":{"string":"str1"}}
> {"strCol":{"string":"str3"}}
> {"strCol":{"string":"str2"}}
>
> The expected result is written to KafkaSink Correctly, *but with a delay*.
>
> Here are the logs from Spark Driver:
> ...
> 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
> highWatermark=2019-09-10T11:43:37.273Z,
> synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
> ...
> 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
> highWatermark=2019-09-10T11:44:54.238Z,
> synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}
>
> As per the logs,
> when the processing time was 12:12:42, the highWatermark was at 11:43:37.
> Almost 30 minutes delay. And
> when the processing time was 12:18:53, the highWatermark was at 11:44:54.
>
> From the above logs, it seems that the watermark is moving slowly.
>
> Is there an IT for SparkRunner with Unbounded data and Windowing
> aggregation?
> Is this a known bug?
>
> Thanks,
> Rahul
>
> On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram 
> wrote:
>
>> Hi,
>>
>> We have detected an issue with SparkRunner and Watermark.
>>
>> *Pipeline*: Read from two Kafka Sources => Apply fixed window of
>> duration 1 minute to both the PCollections => Apply SqlTransform with query
>> "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s
>> join `kafka_source2` as c on s.name = c.name" => write the emitted
>> output to Kafka Sink
>>
>> we are using the watermark provided in
>> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
>> We have given maxDelay as 0.
>>
>> As we have applied fixed window of 1 minute duration and as the elements
>> timestamps are monotonically increasing, we are expecting the output to be
>> emitted when the current processing time crosses 12-02-00 with a reasonable
>> delay(say 10 seconds). But, we are getting the result of the window after a
>> long delay.
>>
>> In Spark logs it seems that the watermark is lagging.
>> Here are the logs:
>> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
>> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
>> highWatermark=2019-09-05T11:57:06.302Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
>> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
>> highWatermark=2019-09-05T11:57:06.686Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
>> 19/09/05 12:02:50 INFO
>> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
>> timestamp: 1567684500500 has completed, watermarks have been updated.
>>
>> As you can see, when the current processing time is 12:02:50, the
>> highWatermark is 11:57:06.
>> As the processing time progresses, the gap between processing time and
>> highWatermark is increasing.
>>
>> We ran the same pipeline with same data in Flink Runner and Direct Runner
>> and we have not seen this issue. In these runners, we can see that the
>> Watermark is almost equal to Processing time.
>>
>> Sample Input Data :
>>
>> kafka_source1:
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
>> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>>
>> kafka_source2:
>> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
>> 12-01-26 704060'}
>> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05
>> 12-01-27 712300'}
>> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05
>> 12-01-28 713951'}
>>
>> what can be the issue here?
>>
>> Regards,
>> shanta
>>
>


Re: Watermark is lagging in Spark Runner with kafkaIO

2019-09-10 Thread rahul patwari
Hi,
I am facing this issue too.
+dev 

Here is the Pipeline that we are using(providing a very simple pipeline to
highlight the issue):
KafkaSource -> SqlTransform -> KafkaSink

We are reading from a single topic in KafkaSource with a single partition.

Here is the data that we are producing to KafkaSource topic:
"str1", "2019-09-10 11:36:42"
"str2", "2019-09-10 11:36:44"
"str3", "2019-09-10 11:36:45"

The first column name is "strCol".
The second column, i.e. the timestamp in string format is being used as the
timestamp of the element.
The timestamp is the wall time when the record got generated.
After publishing this data to the Kafka topic, we are not publishing any
more data. The topic is idle after that.
The timestamps of the records are monotonically increasing.

Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"

Here is the result from KafkaSink:
{"strCol":{"string":"str1"}}
{"strCol":{"string":"str3"}}
{"strCol":{"string":"str2"}}

The expected result is written to KafkaSink Correctly, *but with a delay*.

Here are the logs from Spark Driver:
...
19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
highWatermark=2019-09-10T11:43:37.273Z,
synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
...
19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
highWatermark=2019-09-10T11:44:54.238Z,
synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}

As per the logs,
when the processing time was 12:12:42, the highWatermark was at 11:43:37.
Almost 30 minutes delay. And
when the processing time was 12:18:53, the highWatermark was at 11:44:54.

>From the above logs, it seems that the watermark is moving slowly.

Is there an IT for SparkRunner with Unbounded data and Windowing
aggregation?
Is this a known bug?

Thanks,
Rahul

On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram 
wrote:

> Hi,
>
> We have detected an issue with SparkRunner and Watermark.
>
> *Pipeline*: Read from two Kafka Sources => Apply fixed window of duration
> 1 minute to both the PCollections => Apply SqlTransform with query "select
> c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join
> `kafka_source2` as c on s.name = c.name" => write the emitted output to
> Kafka Sink
>
> we are using the watermark provided in
> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
> We have given maxDelay as 0.
>
> As we have applied fixed window of 1 minute duration and as the elements
> timestamps are monotonically increasing, we are expecting the output to be
> emitted when the current processing time crosses 12-02-00 with a reasonable
> delay(say 10 seconds). But, we are getting the result of the window after a
> long delay.
>
> In Spark logs it seems that the watermark is lagging.
> Here are the logs:
> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
> highWatermark=2019-09-05T11:57:06.302Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
> highWatermark=2019-09-05T11:57:06.686Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
> 19/09/05 12:02:50 INFO
> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
> timestamp: 1567684500500 has completed, watermarks have been updated.
>
> As you can see, when the current processing time is 12:02:50, the
> highWatermark is 11:57:06.
> As the processing time progresses, the gap between processing time and
> highWatermark is increasing.
>
> We ran the same pipeline with same data in Flink Runner and Direct Runner
> and we have not seen this issue. In these runners, we can see that the
> Watermark is almost equal to Processing time.
>
> Sample Input Data :
>
> kafka_source1:
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>
> kafka_source2:
> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
> 12-01-26 704060'}
> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27
> 712300'}
> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28
> 713951'}
>
> what can be the issue here?
>
> Regards,
> shanta
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Jan,

I was using Beam 2.13.0. I have upgraded Beam version to 2.14.0 and the
results are always correct. No more inconsistencies.

Does BEAM-7269 affect all the runners?

Thanks,
Rahul

On Fri, Aug 9, 2019 at 2:15 PM Jan Lukavský  wrote:

> Hi Rahul,
>
> what version of Beam are you using? There was a bug [1], which was fixed
> in 2.14.0. This bug could cause what you observe.
>
> Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7269
> On 8/9/19 10:35 AM, rahul patwari wrote:
>
> Hi Robert,
>
> When PCollection is created using "Create.of(listOfRow)
> *.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
> By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
> the times).
> By "Incorrect" result, I mean that the elements are missing. The elements
> are not duplicated. The elements are not batched differently.
>
> I have used System.identityHashcode(this) to convert PCollection to
> PCollection> to apply Stateful Pardo(GroupIntoBatches) as
> per your suggestion in this thread
> <https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>
>
> To verify the result, I have used GroupByKey, which should give the
> same result as GroupIntoBatches *for my case*.
>
> However, When PCollection is created using "Create.of(listOfRow)", the
> results are always correct.
>
> Regards,
> Rahul
>
> On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw 
> wrote:
>
>> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
>> elements missing/duplicated, or just batched differently?
>>
>> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
>> wrote:
>> >
>> > I only ran in Direct runner. I will run in other runners and let you
>> know the results.
>> > I am not setting "streaming" when executing.
>> >
>> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
>> >>
>> >> Have you tried running this on more than one runner (e.g. Dataflow,
>> Flink, Direct)?
>> >>
>> >> Are you setting --streaming when executing?
>> >>
>> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am getting inconsistent results when using GroupIntoBatches
>> PTransform.
>> >>> I am using Create.of() PTransform to create a PCollection from
>> in-memory. When a coder is given with Create.of() PTransform, I am facing
>> the issue.
>> >>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> >>> If Batch Size is 1, results are always consistent.
>> >>>
>> >>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>> >>>
>> >>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>> >>>
>> >>> Thanks,
>> >>> Rahul
>>
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-09 Thread rahul patwari
Hi Robert,

When PCollection is created using "Create.of(listOfRow)
*.withCoder(RowCoder.of(schema))*", I am getting "Inconsistent" results.
By "Inconsistent", I mean that the result is "Incorrect" sometimes(most of
the times).
By "Incorrect" result, I mean that the elements are missing. The elements
are not duplicated. The elements are not batched differently.

I have used System.identityHashcode(this) to convert PCollection to
PCollection> to apply Stateful Pardo(GroupIntoBatches) as
per your suggestion in this thread
<https://lists.apache.org/thread.html/ed3344698db1bd107f2c2466f813e045056b62084806445fd54a61fc@%3Cdev.beam.apache.org%3E>

To verify the result, I have used GroupByKey, which should give the
same result as GroupIntoBatches *for my case*.

However, When PCollection is created using "Create.of(listOfRow)", the
results are always correct.

Regards,
Rahul

On Fri, Aug 9, 2019 at 1:05 PM Robert Bradshaw  wrote:

> Could you clarify what you mean by "inconsistent" and "incorrect"? Are
> elements missing/duplicated, or just batched differently?
>
> On Fri, Aug 9, 2019 at 2:18 AM rahul patwari 
> wrote:
> >
> > I only ran in Direct runner. I will run in other runners and let you
> know the results.
> > I am not setting "streaming" when executing.
> >
> > On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:
> >>
> >> Have you tried running this on more than one runner (e.g. Dataflow,
> Flink, Direct)?
> >>
> >> Are you setting --streaming when executing?
> >>
> >> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari <
> rahulpatwari8...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am getting inconsistent results when using GroupIntoBatches
> PTransform.
> >>> I am using Create.of() PTransform to create a PCollection from
> in-memory. When a coder is given with Create.of() PTransform, I am facing
> the issue.
> >>> If the coder is not provided, the results are consistent and
> correct(Maybe this is just a coincidence and the problem is at some other
> place).
> >>> If Batch Size is 1, results are always consistent.
> >>>
> >>> Not sure if this is an issue with Serialization/Deserialization (or)
> GroupIntoBatches (or) Create.of() PTransform.
> >>>
> >>> The Java code, expected correct results, and inconsistent results are
> available at https://github.com/rahul8383/beam-examples
> >>>
> >>> Thanks,
> >>> Rahul
>


Re: Inconsistent Results with GroupIntoBatches PTransform

2019-08-08 Thread rahul patwari
I only ran in Direct runner. I will run in other runners and let you know
the results.
I am not setting "streaming" when executing.

On Fri 9 Aug, 2019, 2:56 AM Lukasz Cwik,  wrote:

> Have you tried running this on more than one runner (e.g. Dataflow, Flink,
> Direct)?
>
> Are you setting --streaming when executing?
>
> On Thu, Aug 8, 2019 at 10:23 AM rahul patwari 
> wrote:
>
>> Hi,
>>
>> I am getting inconsistent results when using GroupIntoBatches PTransform.
>> I am using Create.of() PTransform to create a PCollection from in-memory.
>> When a coder is given with Create.of() PTransform, I am facing the issue.
>> If the coder is not provided, the results are consistent and
>> correct(Maybe this is just a coincidence and the problem is at some other
>> place).
>> If Batch Size is 1, results are always consistent.
>>
>> Not sure if this is an issue with Serialization/Deserialization (or)
>> GroupIntoBatches (or) Create.of() PTransform.
>>
>> The Java code, expected correct results, and inconsistent results are
>> available at https://github.com/rahul8383/beam-examples
>>
>> Thanks,
>> Rahul
>>
>


Inconsistent Results with GroupIntoBatches PTransform

2019-08-08 Thread rahul patwari
Hi,

I am getting inconsistent results when using GroupIntoBatches PTransform.
I am using Create.of() PTransform to create a PCollection from in-memory.
When a coder is given with Create.of() PTransform, I am facing the issue.
If the coder is not provided, the results are consistent and correct(Maybe
this is just a coincidence and the problem is at some other place).
If Batch Size is 1, results are always consistent.

Not sure if this is an issue with Serialization/Deserialization (or)
GroupIntoBatches (or) Create.of() PTransform.

The Java code, expected correct results, and inconsistent results are
available at https://github.com/rahul8383/beam-examples

Thanks,
Rahul


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-26 Thread rahul patwari
Thanks for your detailed explanation Rui.

Like you said, the triggers for the PCollections should be compatible with
"Slowly Changing Lookup Cache" pattern.

Rui, If this feature makes sense, can you please create a JIRA for it.

I will start working on splitting BeamJoinRel.java to specific
implementations with SQL planner rules. I will also implement the "Slowly
Changing Lookup Cache" pattern with SQL planner rules.

Thanks,
Rahul

On Sat 27 Jul, 2019, 1:58 AM Rui Wang,  wrote:

>
>
>> PCollection mainStream = ...
>> *PCollectionView>>* lookupStream = ...  // Note:
>> PCollectionView not PCollection. I have referred to PCollection before. And 
>> *PCollectionView
>> should be of type Multimap*, to perform SideinputJoin.
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
>> //PCollectionTuple has to be enhanced to take PCollectionView also as an
>> argument.
>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>>
> and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
>> and a *PCollectionView*(instanceof check), SideInputJoin will be applied.
>>
>
> Yes, I am thinking something similar to it.
>
>
>
>> I think that performing SideInputJoin on two unbounded PCollections with
>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
>> Cache Pattern") is relatively straight forward if we take *PCollection*
>> itself as an argument for LookupTable in PCollectionTuple.
>>
> I think it's a hack in BeamJoinRel to check WindowFn and perform
> SideInput when one side is unbounded non-global windowing, another side is
> unbounded global windowing (and likely you need also check triggers?). For
> SQL, if you really want to do it, you should do it by planner rules to
> match exactly the case you want to support and decouple this join
> implementation from BeamJoinRel.
>
> Even current BeamJoinRel is too large and we should split it to different
> JoinRel to match different plans.
>
>
>
>> The conversion of PCollection to PCollectionView is hidden for the user
>> in this case(Which will be performed internally by SideInputJoin).
>> Moreover, if the user wants to perform some SQL Aggregations on
>> "lookupStream" before performing Join with "mainStream"(Multiple SQL
>> Queries separated by ";"), it is possible in this case, as the
>> "lookupStream" is a PCollection. But, it is not possible if the
>> "lookupStream" is a PCollectionView.
>>
> It's true that PCollectionView will limit further SQL operations. The
> workaround is do those operations by java before using SqlTransform, and
> within SqlTransfrom, start with the Join.
>
>
> So if your use case is support a general SQL operations on two unbounded
> PCollections but with a special need that to perform a SideInput join for
> these two unbounded PColleciton with a special WindowFn setting (maybe even
> trigger) checking, the best way then is to define SQL plan rules and have a
> separate Rel implementation.
>
>
>
> -Rui
>
>
>
>
>> Regards,
>> Rahul
>>
>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang  wrote:
>>
>>> I see.
>>>
>>> Actually I was still referring to make "LookupStream" as
>>> PCollectionView to perform sideinput join, which then doesn't have mismatch
>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to
>>> decide if perform a sideinput join for two unbounded PCollection when their
>>> WindowFn does not match.
>>>
>>> And "data completeness" really means is sideinput is triggered so it
>>> could change, and then the question is when sideinput is changed, should we
>>> refine previous data? It becomes harder to reason at this moment.
>>>
>>>
>>> Rui
>>>
>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
>>>> "*In terms of Join schematic, I think it's hard to reason data
>>>> completeness since one side of the join is changing*"
>>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>>>> the condition that one of the PCollection being Joined have WindowFn as
>>>> [Global W

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-26 Thread rahul patwari
Is this the flow that you are referring to:

PCollection mainStream = ...
*PCollectionView>>* lookupStream = ...  // Note:
PCollectionView not PCollection. I have referred to PCollection
before. And *PCollectionView
should be of type Multimap*, to perform SideinputJoin.
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
mainStream)).and(new TupleTag("LookupTable"), lookupStream);
//PCollectionTuple has to be enhanced to take PCollectionView also as an
argument.
tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));

and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
and a *PCollectionView*(instanceof check), SideInputJoin will be applied.

I think that performing SideInputJoin on two unbounded PCollections with
different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
Cache Pattern") is relatively straight forward if we take *PCollection*
itself as an argument for LookupTable in PCollectionTuple. The conversion
of PCollection to PCollectionView is hidden for the user in this case(Which
will be performed internally by SideInputJoin). Moreover, if the user wants
to perform some SQL Aggregations on "lookupStream" before performing Join
with "mainStream"(Multiple SQL Queries separated by ";"), it is possible in
this case, as the "lookupStream" is a PCollection. But, it is not possible
if the "lookupStream" is a PCollectionView.

Regards,
Rahul

On Fri, Jul 26, 2019 at 9:19 AM Rui Wang  wrote:

> I see.
>
> Actually I was still referring to make "LookupStream" as PCollectionView
> to perform sideinput join, which then doesn't have mismatch WindowFn
> problem. Otherwise, we shouldn't check special case of WindowFn to decide
> if perform a sideinput join for two unbounded PCollection when their
> WindowFn does not match.
>
> And "data completeness" really means is sideinput is triggered so it could
> change, and then the question is when sideinput is changed, should we
> refine previous data? It becomes harder to reason at this moment.
>
>
> Rui
>
> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari 
> wrote:
>
>> "*In terms of Join schematic, I think it's hard to reason data
>> completeness since one side of the join is changing*"
>> - As it is possible to apply [Global Windows with Non-Default Trigger] to
>> Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>> the condition that one of the PCollection being Joined have WindowFn as
>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane())] is it sufficient to perform the Join of
>> "MainStream" and this "LookupStream"?
>>
>> In other words, I mean to say that instead of directly throwing Exception
>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359>
>>  when
>> Joining two Unbounded PCollections with different WindowFns, If we can
>> ensure that
>> MainStream: one side of the join is Unbounded with WindowFn as
>> [Non-Global Windows with DefaultTrigger] and
>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane()) Trigger],
>> we can directly perform a SideInputJoin.
>>
>> Will we have "data completeness" problem even in "Slowly Changing lookup
>> Cache Pattern"?
>>
>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:
>>
>>> To be more clear, I think it's useful if we can achieve the following
>>> that you wrote
>>>
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> -Rui
>>>
>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:
>>>
>>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>>> the slow changing table join problem.
>>>>
>>>> To your question: "Can we implement SideInputJoin for this case",
>>>> there are two perspectives.
>>>>
>>>> In terms of implementing the slowing changing lookup cache pattern
>>>> <https://beam.apache.org/do

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread rahul patwari
"*In terms of Join schematic, I think it's hard to reason data completeness
since one side of the join is changing*"
- As it is possible to apply [Global Windows with Non-Default Trigger] to
Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
the condition that one of the PCollection being Joined have WindowFn as
[Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane())] is it sufficient to perform the Join of
"MainStream" and this "LookupStream"?

In other words, I mean to say that instead of directly throwing Exception
<https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359>
when
Joining two Unbounded PCollections with different WindowFns, If we can
ensure that
MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
Windows with DefaultTrigger] and
LookupStream: the other side of the Join is a "Slowly Changing Lookup
Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane()) Trigger],
we can directly perform a SideInputJoin.

Will we have "data completeness" problem even in "Slowly Changing lookup
Cache Pattern"?

On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:

> To be more clear, I think it's useful if we can achieve the following that
> you wrote
>
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> -Rui
>
> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:
>
>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
>> slow changing table join problem.
>>
>> To your question: "Can we implement SideInputJoin for this case", there
>> are two perspectives.
>>
>> In terms of implementing the slowing changing lookup cache pattern
>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
>>  in
>> BeamSQL, such sidinput join can be done that way. At least it worth
>> exploring it until we identify blockers. I also think this pattern is
>> already useful to users.
>>
>> In terms of Join schematic, I think it's hard to reason data completeness
>> since one side of join is changing.
>>
>> -Rui
>>
>>
>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>>
>>> Hi Kenn,
>>>
>>> If we consider the following two *Unbounded* PCollections:
>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>> coincidentally turned out to be the opposite
>>>
>>> Joining these two PCollections in BeamSql currently is not possible
>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>> Mismatch)
>>> But in this case, PCollection1 can be joined with PCollection2 using
>>> SideInputJoin (
>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>> which is being done for Joining an Unbounded PCollection with Bounded
>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>> elements once per window for this case.
>>> The result of the join might be fuzzy for the window when the Trigger
>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>
>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>> BeamSql can support Pattern:
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>> which is currently not possible.
>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>> TableProvider.
>>>
>>> If we can support this, User will be able to do:
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable&quo

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-25 Thread rahul patwari
Hi Kenn,

If we consider the following two *Unbounded* PCollections:
- PCollection1 => [*Non-Global* Window with Default Trigger]
- PCollection2 => [Global Window with *Non-Default* Trigger] :)
coincidentally turned out to be the opposite

Joining these two PCollections in BeamSql currently is not possible because
of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn Mismatch)
But in this case, PCollection1 can be joined with PCollection2 using
SideInputJoin (
https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
which is being done for Joining an Unbounded PCollection with Bounded
PCollection. I am thinking that Beam can guarantee it joins all input
elements once per window for this case.
The result of the join might be fuzzy for the window when the Trigger for
PCollection2 fires and sideinput gets loaded into Memory.

PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
can support Pattern:
https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
which is currently not possible.
I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
to natively support PCollectionView so that BeamSql supports "Slowly
Updating Global Window Sideinput Pattern" using SqlTransform's
TableProvider.

If we can support this, User will be able to do:
PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

Can we implement SideInputJoin for this case?
I might be wrong in my understanding. Please let me know your thoughts.

Thanks,
Rahul

On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles  wrote:

> I think the best way to approach this is probably to have an example SQL
> statement and to discuss what the relational semantics should be.
>
> Windowing is not really part of SQL (yet) and in a way it just needs very
> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
> BY operation, where you GROUP BY window columns that were added. So it is
> more explicit than in Beam. Relations do not have a WindowFn so there is no
> problem of them being incompatible.
>
> With Beam SQL there are basically two ways of windowing that work totally
> differently:
>
> 1. SQL style windowing where you GROUP BY windows. This does not use the
> input PCollection windowfn
> 2. PCollection windowing where the SQL does not do any windowing - this
> should apply the SQL expression to each window independently
>
> In order to support a hybrid of these, it might be:
>
> 3. SQL style windowing, where when a PCollection has window assigned, the
> window columns are added before the SQL is applied. It is a bit strange but
> might enable your use.
>
> Kenn
>
> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari 
> wrote:
>
>> Hi,
>>
>> Beam currently doesn't support Join of Unbounded PCollections of
>> different WindowFns (
>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>> ).
>>
>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>
>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
>> one of the Unbounded PCollection has [GlobalWindows Applied with
>> Non-Default Trigger(probably a slow-changing lookup cache
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>> by performing 'SideInputJoin'?
>>
>> Regards,
>> Rahul
>>
>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Yes. But, GroupIntoBatches works on KV. We are working on
PCollection throughout our pipeline.
We can convert Row to KV. But, we only have a few keys and a Bounded
PCollection. As we have Global windows and a few keys, the opportunity for
parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per
Window] Processing.

On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax  wrote:

> Have you looked at the GroupIntoBatches transform?
>
> On Thu, Jul 25, 2019 at 9:34 AM rahul patwari 
> wrote:
>
>> So, If an RPC call has to be performed for a batch of
>> Rows(PCollection), instead of each Row, the recommended way is to
>> batch the Rows in startBundle() of DoFn(
>> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
>> I thought Stateful and Timely Processing could be helpful here.
>>
>> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw 
>> wrote:
>>
>>> Though it's not obvious in the name, Stateful ParDos can only be
>>> applied to keyed PCollections, similar to GroupByKey. (You could,
>>> however, assign every element to the same key and then apply a
>>> Stateful DoFn, though in that case all elements would get processed on
>>> the same worker.)
>>>
>>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>>  wrote:
>>> >
>>> > Hi,
>>> >
>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> gives an example of assigning an arbitrary-but-consistent index to each
>>> element on a per key-and-window basis.
>>> >
>>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>>> PCollection with Fixed Windows, the state is maintained per window and
>>> every element in the window will be assigned a consistent index?
>>> > Does this mean every element belonging to the window will be processed
>>> in a single DoFn Instance, which otherwise could have been done in multiple
>>> parallel instances, limiting performance?
>>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>>> PCollection?
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>


Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
So, If an RPC call has to be performed for a batch of
Rows(PCollection), instead of each Row, the recommended way is to
batch the Rows in startBundle() of DoFn(
https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
I thought Stateful and Timely Processing could be helpful here.

On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw  wrote:

> Though it's not obvious in the name, Stateful ParDos can only be
> applied to keyed PCollections, similar to GroupByKey. (You could,
> however, assign every element to the same key and then apply a
> Stateful DoFn, though in that case all elements would get processed on
> the same worker.)
>
> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>  wrote:
> >
> > Hi,
> >
> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives
> an example of assigning an arbitrary-but-consistent index to each element
> on a per key-and-window basis.
> >
> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
> PCollection with Fixed Windows, the state is maintained per window and
> every element in the window will be assigned a consistent index?
> > Does this mean every element belonging to the window will be processed
> in a single DoFn Instance, which otherwise could have been done in multiple
> parallel instances, limiting performance?
> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
> PCollection?
> >
> > Thanks,
> > Rahul
>


Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread rahul patwari
Hi,

https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an
example of assigning an arbitrary-but-consistent index to each element on a
per key-and-window basis.

If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
PCollection with Fixed Windows, the state is maintained per window and
every element in the window will be assigned a consistent index?
Does this mean every element belonging to the window will be processed in a
single DoFn Instance, which otherwise could have been done in multiple
parallel instances, limiting performance?
Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?

Thanks,
Rahul


Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-22 Thread rahul patwari
Hi,

Beam currently doesn't support Join of Unbounded PCollections of different
WindowFns (
https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
).

BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
performing 'SideInputJoin' with Bounded PCollection as a SideInput.

Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
one of the Unbounded PCollection has [GlobalWindows Applied with
Non-Default Trigger(probably a slow-changing lookup cache
https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
by performing 'SideInputJoin'?

Regards,
Rahul


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-18 Thread rahul patwari
The fact that Bounded vs Unbounded JOIN is performed by considering Bounded
PCollection as a Sideinput means that the Bounded PCollection should fit
into Memory. Am I right? In that case bounded PCollection of Hive (or)
HDFS, where data might not fit into Memory cannot be JOINED with Kafka?

Does this approach have something to do with Watermark? As the computation
might take time depending on the size of the Bounded Data, and the window
might get expired before the result for the window is emitted.

Thanks,
Rahul

On Thu, Jul 18, 2019 at 10:13 PM Rui Wang  wrote:

> The idea is slowing change table is treated as a PCollectionView, which
> leads to a sideinput join implementation in which you join an unbounded
> windowed stream (Kafka) with triggering based sideinput (the slowing
> changing data). That's how it follows the pattern. If you are considering
> windowed data join windowed data case, in which one side of data is slowing
> changing, sounds like you only need to make the PCollectionView windowed by
> your need(fixed windowing for example. Also need to double check if
> sideinput can be triggered on non-global window), in this case windowing
> strategy seems has to be consistent for both sides.
>
> In BeamSQL, if one side of binary join is bounded PCollection, BeamSQL
> constructs a sideinput join already (you can check [1] for implementation
> detail). It's even more straightforward to do: one side of join is a
> PCollectionView? go sideinput join.
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L185
>
> -Rui
>
> On Thu, Jul 18, 2019 at 7:52 AM rahul patwari 
> wrote:
>
>> Hi Rui,
>>
>> I have a query about BEAM-7758.
>> If [Pattern: slowly changing lookup cache] is followed while defining and
>> constructing the lookup table and set it with SqlTransform, if any
>> aggregation (JOIN) need to be performed, say, with windowed Kafka
>> PCollection table and the lookup table, the aggregation cannot be done
>> unless both the PCollections have matching WindowFns as they are unbounded.
>> What can be done to treat the lookup table as Bounded PCollection and
>> perform aggregation with every window of Kafka's PCollection?
>>
>> Thanks,
>> Rahul
>>
>>
>> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang  wrote:
>>
>>> Another approach is to let BeamSQL support it natively, as the title of
>>> this thread says: "as a Table in BeamSQL".
>>>
>>> We might be able to define a table with properties that says this table
>>> return a PCollectionView. By doing so we will have a trigger based
>>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>>> only need to construct a table and set it to SqlTransform
>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>>> *. *
>>>
>>> Create a JIRA to track this idea:
>>> https://jira.apache.org/jira/browse/BEAM-7758
>>>
>>>
>>> -Rui
>>>
>>>
>>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>>> code example )
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>>
>>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>>> to manually call BigQuery using the BigQuery client.
>>>>
>>>> Regards
>>>>
>>>> Reza
>>>>
>>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>>
>>>>> We have a use case to read slowly changing bounded data as a
>>>>> PCollection along with the main PCollection from Kafka(windowed) and use 
>>>>> it
>>>>> in the query of BeamSql.
>>>>>
>>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>>
>>>>> Approaches followed but not Successful:
>>>>>
>>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>

Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-17 Thread rahul patwari
Hi,

Please add me as a contributor to the Beam Issue Tracker. I would like to
work on this feature. My ASF Jira Username: "rahul8383"

Thanks,
Rahul



On Wed, Jul 17, 2019 at 1:06 AM Rui Wang  wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread rahul patwari
Hi Reza, Rui,

Can we use [slowly changing lookup cache] approach if the source is [HDFS
(or) HIVE] (data is changing), where the PCollection cannot fit into Memory
in BeamSQL?
This PCollection will be JOINED with Windowed PCollection Created from
Reading data in Kafka in BeamSQL.

Thanks and Regards,
Rahul

On Wed, Jul 17, 2019 at 3:07 AM Reza Rokni  wrote:

> +1
>
> On Tue, 16 Jul 2019 at 20:36, Rui Wang  wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread rahul patwari
Hi,

we are following [*Pattern: Slowly-changing lookup cache*] from
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

We have a use case to read slowly changing bounded data as a PCollection
along with the main PCollection from Kafka(windowed) and use it in the
query of BeamSql.

Is it possible to design such a use case with Beam Java SDK?

Approaches followed but not Successful:

1) GenerateSequence => GlobalWindow with Data Trigger => Composite
Transform(which applies Beam I/O on the
pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
to PCollection Apply BeamSQL
Comments: Beam I/O reads data only once even though a long value is
generated from GenerateSequece with periodicity. The expectation is that
whenever a long value is generated, Beam I/O will be used to read the
latest data. Is this because of optimizations in the DAG? Can the
optimizations be overridden?

2) The pipeline is the same as approach 1. But, instead of using a
composite transform, a DoFn is used where a for loop will emit each Row of
the PCollection.
comments: The output PCollection is unbounded. But, we need a bounded
PCollection as this PCollection is used to JOIN with PCollection of each
window from Kafka. How can we convert an Unbounded PCollection to Bounded
PCollection inside a DoFn?

Are there any better Approaches?

Regards,
Rahul


Re: PR#6675 Updates

2019-07-06 Thread rahul patwari
On Fri 5 Jul, 2019, 9:25 PM Ismaël Mejía,  wrote:

>  This is a holiday week in the US and a good chunk of the people in
> the project have been busy between Beam summit and other events in the
> last days, this is why reviews are taking longer than expected. Sorry,
> next week most things will be back to normal (hopefully).
>
> On Fri, Jul 5, 2019 at 10:27 AM Sehrish Naeem
>  wrote:
> >
> > Hi,
> >
> > My name is Sehrish Naeem, can someone please review the PR of BEAM-6675?
> >
> > Thank you
>


Re: Custom Watermark Instance being created multiple times for KafkaIO

2019-05-22 Thread rahul patwari
Hi Lukasz,

There was a bug in my code. When the topic is idle, I indeed get watermark
as (now - maxDelay).

I have a few questions:
I have created a static int variable in my watermark class and incremented
the variable inside the constructor. I ran the pipeline in SparkRunner for
approximately 3 minutes, and in the logs, I see that the counter value is
around 800, which implies that my watermark class instance is created 800
times.

previousWatermark javadoc[1] suggests that it is the latest checkpointed
watermark. Does this mean watermark is checkpointed along with partition
offsets? As the counter value is around 800, does this mean watermark is
checkpointed 800 times? Does this mean my watermark class instance will be
created after every new checkpoint?

1:
https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L49


Thanks,
Rahul

On Thu, May 23, 2019 at 12:23 AM Lukasz Cwik  wrote:

>
>
> On Wed, May 22, 2019 at 11:17 AM rahul patwari 
> wrote:
>
>> will watermark also get checkpointed by default along with the offset of
>> the partition?
>>
>> We have found a limitation for CustomTimestampPolicyWithLimitedDelay. 
>> Consider
>> this scenario:
>> If we are processing a stream of events from Kafka with event timestamps
>> older than the current processing time(say 1 day), and If I set maxDelay as
>> 1 day, when the topic is idle(for some time), watermark will advance to the
>> current time, thereby discarding any data which arrives later in the
>> pipeline(as the event timestamps are 1 day old) considering them as late.
>>
>
> This seems like a bug since the watermark should only advance to
> currentTime - maxDelay if the topic is empty based upon the
> CustomTimestampPolicyWithLimitedDelay javadoc[1].
>
> Also, can we use an instance variable(to calculate idle time in Kafka
>> topic and advance watermark accordingly, instead of moving the watermark to
>> the current time) which cannot be checkpointed, for the class which
>> implements the createTimestampPolicy method in TimestampPolicyFactory
>> interface?
>>
>> Regards,
>> Rahul
>>
>> On Wed, May 22, 2019 at 9:04 AM rahul patwari 
>> wrote:
>>
>>> Hi,
>>>
>>> We are using withTimestampPolicyFactory
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
>>> (TimestampPolicyFactory
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
>>> >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>>> ,V
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>>> > timestampPolicyFactory) method in KafkaIO.Read, where we have written
>>> a lambda for createTimestampPolicy
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
>>>  tp,
>>> java.util.Optional>> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
>>> > previousWatermark).
>>>
>>> Sample code:
>>> KafkaIO.Read kafkaIoRead = KafkaIO.>> GenericRecord>read()
>>>
>>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>>> AvroCoder.of(GenericRecord.class, avroSchema))
>>> .withTimestampPolicyFactory((tp, prevWatermark) -> new
>>> KafkaCustomTimestampPolicy(prevWatermark));
>>>
>>> The topic we are reading from only has one partition.
>>> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance
>>> is being created multiple times.
>>>
>>> Is there any documentation describing the guidelines one should follow
>>> when implementing custom watermark?
>>>
>>
> Javadoc for getWatermark on unbounded sources[2].
>
>
>> How does checkpointing affect the watermark?
>>>
>>
> The watermark doesn't advance while a source is checkpointed and not being
> actively processed. The watermark only advan

Re: Custom Watermark Instance being created multiple times for KafkaIO

2019-05-22 Thread rahul patwari
will watermark also get checkpointed by default along with the offset of
the partition?

We have found a limitation for CustomTimestampPolicyWithLimitedDelay. Consider
this scenario:
If we are processing a stream of events from Kafka with event timestamps
older than the current processing time(say 1 day), and If I set maxDelay as
1 day, when the topic is idle(for some time), watermark will advance to the
current time, thereby discarding any data which arrives later in the
pipeline(as the event timestamps are 1 day old) considering them as late.

Also, can we use an instance variable(to calculate idle time in Kafka topic
and advance watermark accordingly, instead of moving the watermark to the
current time) which cannot be checkpointed, for the class which implements
the createTimestampPolicy method in TimestampPolicyFactory interface?

Regards,
Rahul

On Wed, May 22, 2019 at 9:04 AM rahul patwari 
wrote:

> Hi,
>
> We are using withTimestampPolicyFactory
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
> (TimestampPolicyFactory
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
>  <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> ,V
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> > timestampPolicyFactory) method in KafkaIO.Read, where we have written a
> lambda for createTimestampPolicy
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
>  tp,
> java.util.Optional <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
> > previousWatermark).
>
> Sample code:
> KafkaIO.Read kafkaIoRead = KafkaIO. GenericRecord>read()
>
> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
> AvroCoder.of(GenericRecord.class, avroSchema))
> .withTimestampPolicyFactory((tp, prevWatermark) -> new
> KafkaCustomTimestampPolicy(prevWatermark));
>
> The topic we are reading from only has one partition.
> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is
> being created multiple times.
>
> Is there any documentation describing the guidelines one should follow
> when implementing custom watermark?
> How does checkpointing affect the watermark?
>
> StackTrace from constructor of KafkaCustomTimestampPolicy:
>
>  
> com.beam.transforms.KafkaCustomTimestampPolicy.(KafkaCustomTimestampPolicy.java:41),
>  
> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
> instance is created here)
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:536),
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
>
>  
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
>
>  
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
>
>  
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
>
>  
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
>  java.util.concurrent.FutureTask.run(FutureTask.java:266),
>
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
>
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
>  java.lang.Thread.run(Thread.java:748)
>


Custom Watermark Instance being created multiple times for KafkaIO

2019-05-21 Thread rahul patwari
Hi,

We are using withTimestampPolicyFactory

(TimestampPolicyFactory

https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
,V

> timestampPolicyFactory) method in KafkaIO.Read, where we have written a
lambda for createTimestampPolicy
(org.apache.kafka.common.TopicPartition
tp,
java.util.Optionalhttp://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
> previousWatermark).

Sample code:
KafkaIO.Read kafkaIoRead = KafkaIO.read()

.withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(GenericRecord.class, avroSchema))
.withTimestampPolicyFactory((tp, prevWatermark) -> new
KafkaCustomTimestampPolicy(prevWatermark));

The topic we are reading from only has one partition.
In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is
being created multiple times.

Is there any documentation describing the guidelines one should follow when
implementing custom watermark?
How does checkpointing affect the watermark?

StackTrace from constructor of KafkaCustomTimestampPolicy:
 
com.beam.transforms.KafkaCustomTimestampPolicy.(KafkaCustomTimestampPolicy.java:41),
 
com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
instance is created here)
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.(KafkaUnboundedReader.java:536),
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
 java.util.concurrent.FutureTask.run(FutureTask.java:266),
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
 java.lang.Thread.run(Thread.java:748)


Re: NullPointerException - Session windows with Lateness in FlinkRunner

2019-03-27 Thread rahul patwari
+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, 
wrote:

> Hi,
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
> 1.7.2.
>
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->
> Enrichment  -->  KafkaSink
>
> I am generating data in such a way that the first two records belong to
> two different sessions. And, generating the third record before the first
> session expires with the timestamp for the third record in such a way that
> the two sessions will be merged to become a single session.
>
> For Example, These are the sample input and output obtained when I ran the
> same pipeline in DirectRunner.
>
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
>
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
>
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
> indicates the window start time for the session window. Similarly "WET"
> indicates the window End time of the session window. I am getting "WST" and
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
>
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am
> getting this exception when the third record arrives:
>
> 2019-03-27 15:31:00,442 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
> -> (Window.Into()/Window.Assign.out ->
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
> by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
> (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
> ->
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWri