Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread rahul patwari
Hi Reza, Rui,

Can we use [slowly changing lookup cache] approach if the source is [HDFS
(or) HIVE] (data is changing), where the PCollection cannot fit into Memory
in BeamSQL?
This PCollection will be JOINED with Windowed PCollection Created from
Reading data in Kafka in BeamSQL.

Thanks and Regards,
Rahul

On Wed, Jul 17, 2019 at 3:07 AM Reza Rokni  wrote:

> +1
>
> On Tue, 16 Jul 2019 at 20:36, Rui Wang  wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> 
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
>>> wrote:
>>>
 Hi,

 we are following [*Pattern: Slowly-changing lookup cache*] from
 https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

 We have a use case to read slowly changing bounded data as a
 PCollection along with the main PCollection from Kafka(windowed) and use it
 in the query of BeamSql.

 Is it possible to design such a use case with Beam Java SDK?

 Approaches followed but not Successful:

 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
 Transform(which applies Beam I/O on the
 pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
 to PCollection Apply BeamSQL
 Comments: Beam I/O reads data only once even though a long value is
 generated from GenerateSequece with periodicity. The expectation is that
 whenever a long value is generated, Beam I/O will be used to read the
 latest data. Is this because of optimizations in the DAG? Can the
 optimizations be overridden?

 2) The pipeline is the same as approach 1. But, instead of using a
 composite transform, a DoFn is used where a for loop will emit each Row of
 the PCollection.
 comments: The output PCollection is unbounded. But, we need a bounded
 PCollection as this PCollection is used to JOIN with PCollection of each
 window from Kafka. How can we convert an Unbounded PCollection to Bounded
 PCollection inside a DoFn?

 Are there any better Approaches?

 Regards,
 Rahul



>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread Reza Rokni
+1

On Tue, 16 Jul 2019 at 20:36, Rui Wang  wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> 
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread Rui Wang
Another approach is to let BeamSQL support it natively, as the title of
this thread says: "as a Table in BeamSQL".

We might be able to define a table with properties that says this table
return a PCollectionView. By doing so we will have a trigger based
PCollectionView available in SQL rel nodes, thus SQL will be able to
implement [*Pattern: Slowly-changing lookup cache].* By this way, users
only need to construct a table and set it to SqlTransform

*. *

Create a JIRA to track this idea:
https://jira.apache.org/jira/browse/BEAM-7758


-Rui


On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:

> Hi Rahul,
>
> FYI, that patterns is also available in the Beam docs  ( with updated code
> example )
> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>
> Please note in the DoFn that feeds the View.asSingleton() you will need to
> manually call BigQuery using the BigQuery client.
>
> Regards
>
> Reza
>
> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
> wrote:
>
>> Hi,
>>
>> we are following [*Pattern: Slowly-changing lookup cache*] from
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> We have a use case to read slowly changing bounded data as a PCollection
>> along with the main PCollection from Kafka(windowed) and use it in the
>> query of BeamSql.
>>
>> Is it possible to design such a use case with Beam Java SDK?
>>
>> Approaches followed but not Successful:
>>
>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>> Transform(which applies Beam I/O on the
>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>> to PCollection Apply BeamSQL
>> Comments: Beam I/O reads data only once even though a long value is
>> generated from GenerateSequece with periodicity. The expectation is that
>> whenever a long value is generated, Beam I/O will be used to read the
>> latest data. Is this because of optimizations in the DAG? Can the
>> optimizations be overridden?
>>
>> 2) The pipeline is the same as approach 1. But, instead of using a
>> composite transform, a DoFn is used where a for loop will emit each Row of
>> the PCollection.
>> comments: The output PCollection is unbounded. But, we need a bounded
>> PCollection as this PCollection is used to JOIN with PCollection of each
>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>> PCollection inside a DoFn?
>>
>> Are there any better Approaches?
>>
>> Regards,
>> Rahul
>>
>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread Reza Rokni
Hi Rahul,

FYI, that patterns is also available in the Beam docs  ( with updated code
example )
https://beam.apache.org/documentation/patterns/side-input-patterns/.

Please note in the DoFn that feeds the View.asSingleton() you will need to
manually call BigQuery using the BigQuery client.

Regards

Reza

On Tue, 16 Jul 2019 at 14:37, rahul patwari 
wrote:

> Hi,
>
> we are following [*Pattern: Slowly-changing lookup cache*] from
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> We have a use case to read slowly changing bounded data as a PCollection
> along with the main PCollection from Kafka(windowed) and use it in the
> query of BeamSql.
>
> Is it possible to design such a use case with Beam Java SDK?
>
> Approaches followed but not Successful:
>
> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
> Transform(which applies Beam I/O on the
> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
> to PCollection Apply BeamSQL
> Comments: Beam I/O reads data only once even though a long value is
> generated from GenerateSequece with periodicity. The expectation is that
> whenever a long value is generated, Beam I/O will be used to read the
> latest data. Is this because of optimizations in the DAG? Can the
> optimizations be overridden?
>
> 2) The pipeline is the same as approach 1. But, instead of using a
> composite transform, a DoFn is used where a for loop will emit each Row of
> the PCollection.
> comments: The output PCollection is unbounded. But, we need a bounded
> PCollection as this PCollection is used to JOIN with PCollection of each
> window from Kafka. How can we convert an Unbounded PCollection to Bounded
> PCollection inside a DoFn?
>
> Are there any better Approaches?
>
> Regards,
> Rahul
>
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.