Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Ravi Kapoor
On Wed, Oct 19, 2022 at 2:43 PM Ravi Kapoor  wrote:

> I am talking about in batch context. Can we do checkpointing in batch mode
> as well?
> I am *not* looking for any failure or retry algorithm.
> The requirement is to simply materialize a PCollection which can be used
> across the jobs /within the job   in some view/temp table which is
> auto deleted
> I believe Reshuffle
> <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>
>  is
> for streaming. Right?
>
> Thanks,
> Ravi
>
> On Wed, Oct 19, 2022 at 1:32 PM Israel Herraiz via dev <
> dev@beam.apache.org> wrote:
>
>> I think that would be a Reshuffle
>> <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>,
>> but only within the context of the same job (e.g. if there is a failure and
>> a retry, the retry would start from the checkpoint created by the
>> reshuffle). In Dataflow, a group by key, a combiner per key, cogroup by
>> key, stateful dofns and I think splittable dofns will also have the same
>> effect of creating a checkpoint (any shuffling operation will always create
>> a checkpoint).
>>
>> If you want to start a different job (slightly updated code, starting
>> from a previous point of a previous job), in Dataflow that would be a
>> snapshot <https://cloud.google.com/dataflow/docs/guides/using-snapshots>,
>> I think. Snapshots only work in streaming pipelines.
>>
>> On Wed, 19 Oct 2022 at 08:45, Ravi Kapoor  wrote:
>>
>>> Hi Team,
>>> Can we stage a PCollection or  PCollection data? Lets
>>> say to save  the expensive operations between two complex BQ tables time
>>> and again and materialize it in some temp view which will be deleted after
>>> the session.
>>>
>>> Is it possible to do that in the Beam Pipeline?
>>> We can later use the temp view in another pipeline to read the data from
>>> and do processing.
>>>
>>> Or In general I would like to know Do we ever stage the PCollection.
>>> Let's say I want to create another instance of the same job which has
>>> complex processing.
>>> Does the pipeline re perform the computation or would it pick the
>>> already processed data in the previous instance that must be staged
>>> somewhere?
>>>
>>> Like in spark we do have notions of createOrReplaceTempView which is
>>> used to create temp table from a spark dataframe or dataset.
>>>
>>> Please advise.
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorrav...@gmail.com
>>>
>>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564
> kapoorrav...@gmail.com
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Ravi Kapoor
I am talking about in batch context. Can we do checkpointing in batch mode
as well?
I am looking for any failure or retry algorithm.
The requirement is to simply materialize a PCollection which can be used
across the jobs /within the job   in some view/temp table which is
auto deleted
I believe Reshuffle
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>
is
for streaming. Right?

Thanks,
Ravi

On Wed, Oct 19, 2022 at 1:32 PM Israel Herraiz via dev 
wrote:

> I think that would be a Reshuffle
> <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>,
> but only within the context of the same job (e.g. if there is a failure and
> a retry, the retry would start from the checkpoint created by the
> reshuffle). In Dataflow, a group by key, a combiner per key, cogroup by
> key, stateful dofns and I think splittable dofns will also have the same
> effect of creating a checkpoint (any shuffling operation will always create
> a checkpoint).
>
> If you want to start a different job (slightly updated code, starting from
> a previous point of a previous job), in Dataflow that would be a snapshot
> <https://cloud.google.com/dataflow/docs/guides/using-snapshots>, I think.
> Snapshots only work in streaming pipelines.
>
> On Wed, 19 Oct 2022 at 08:45, Ravi Kapoor  wrote:
>
>> Hi Team,
>> Can we stage a PCollection or  PCollection data? Lets say
>> to save  the expensive operations between two complex BQ tables time and
>> again and materialize it in some temp view which will be deleted after the
>> session.
>>
>> Is it possible to do that in the Beam Pipeline?
>> We can later use the temp view in another pipeline to read the data from
>> and do processing.
>>
>> Or In general I would like to know Do we ever stage the PCollection.
>> Let's say I want to create another instance of the same job which has
>> complex processing.
>> Does the pipeline re perform the computation or would it pick the already
>> processed data in the previous instance that must be staged somewhere?
>>
>> Like in spark we do have notions of createOrReplaceTempView which is used
>> to create temp table from a spark dataframe or dataset.
>>
>> Please advise.
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorrav...@gmail.com
>>
>

-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Ravi Kapoor
Hi Team,
Can we stage a PCollection or  PCollection data? Lets say
to save  the expensive operations between two complex BQ tables time and
again and materialize it in some temp view which will be deleted after the
session.

Is it possible to do that in the Beam Pipeline?
We can later use the temp view in another pipeline to read the data from
and do processing.

Or In general I would like to know Do we ever stage the PCollection.
Let's say I want to create another instance of the same job which has
complex processing.
Does the pipeline re perform the computation or would it pick the already
processed data in the previous instance that must be staged somewhere?

Like in spark we do have notions of createOrReplaceTempView which is used
to create temp table from a spark dataframe or dataset.

Please advise.

-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Re: Write data to Jdbc from BigQueryIO | Issue

2022-06-21 Thread Ravi Kapoor
Hello Team,
Has anyone aware of this problem?

Thanks,
Ravi

On Sun, Jun 19, 2022 at 10:59 PM Ravi Kapoor  wrote:

> Hi Team
>
> I am trying writing a PCollection from BQ with Schema as
>
>
> *final Schema schema =
> Schema.builder().addInt64Field("user_id").addStringField("user_name").build();*
>  to a JDBC datasource (oracle)
> having table schema as below on Oracle :
>
> Table_A ( user_id NUMBER(3), user_name varchar(10))
> The code flow is such that this will invoke
>
>
> *PCollection expand(PCollection input)*and internally this will
> call
>
>
> *List fields =
> spec.getFilteredFields(input.getSchema());*which converts the
> resultsetmetadata to BeamSchema and for the numeric type this is what is set
> *LogicalTypes.FixedPrecisionNumeric.of* as FieldType in
> jdbcTypeToBeamFieldConverter method which has baseType as Decimal
>
>
> *SchemaUtil.compareSchemaField(tableField, f)*which subsequently compare
> schemaFieldType with below method:
>
>
> *static boolean compareSchemaFieldType(Schema.FieldType a,
> Schema.FieldType b) {*The below code returns false:
>
> *return
> a.getLogicalType().getBaseType().getTypeName().equals(b.getTypeName());*
>  as the base type is set as DECIMAL for the user_id field in oracle
> datasource.
>
> And it eventually fails with
>
> *"Provided schema doesn't match with database schema. " + " Table has
> fields: ",*
> Can you please check whether it's a bug in matching BQ data type to Oracle?
> Or do I require different handling in writing to Jdbc source?
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564
> kapoorrav...@gmail.com
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Write data to Jdbc from BigQueryIO | Issue

2022-06-19 Thread Ravi Kapoor
Hi Team

I am trying writing a PCollection from BQ with Schema as


*final Schema schema =
Schema.builder().addInt64Field("user_id").addStringField("user_name").build();*
 to a JDBC datasource (oracle)
having table schema as below on Oracle :

Table_A ( user_id NUMBER(3), user_name varchar(10))
The code flow is such that this will invoke


*PCollection expand(PCollection input)*and internally this will
call


*List fields =
spec.getFilteredFields(input.getSchema());*which converts the
resultsetmetadata to BeamSchema and for the numeric type this is what is set
*LogicalTypes.FixedPrecisionNumeric.of* as FieldType in
jdbcTypeToBeamFieldConverter method which has baseType as Decimal


*SchemaUtil.compareSchemaField(tableField, f)*which subsequently compare
schemaFieldType with below method:


*static boolean compareSchemaFieldType(Schema.FieldType a, Schema.FieldType
b) {*The below code returns false:

*return
a.getLogicalType().getBaseType().getTypeName().equals(b.getTypeName());*
 as the base type is set as DECIMAL for the user_id field in oracle
datasource.

And it eventually fails with

*"Provided schema doesn't match with database schema. " + " Table has
fields: ",*
Can you please check whether it's a bug in matching BQ data type to Oracle?
Or do I require different handling in writing to Jdbc source?

-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Re: Chained Job Graph Apache Beam | Dataflow

2022-06-15 Thread Ravi Kapoor
FYI

On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor  wrote:

> Hi Daniel,
>
> I have a use case where I join two tables say A and B and write the joined
> Collection to C.
> Then I would like to filter some records on C and put it to another table
> say D.
> So, the pipeline on Dataflow UI should look like this right?
>
> A
>\
> C -> D
>/
> B
>
> However, the pipeline is writing C -> D in parallel.
> How can this pipeline run in parallel as data has not been pushed yet to C
> by the previous pipeline?
>
> Even when I ran this pipeline, Table D did not get any records inserted as
> well which is apparent.
> Can you help me with this use case?
>
> Thanks,
> Ravi
>
>
>
> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins 
> wrote:
>
>> Can you speak to what specifically you want to be different? The job
>> graph you see, with the A -> B and B-> C being separate is an accurate
>> reflection of your pipeline. table_B is outside of the beam model, by
>> pushing your data there, Dataflow has no ability to identify that no
>> manipulation of data is happening at table_B.
>>
>> If you want to just process data from A to destinations D and E, while
>> writing an intermediate output to table_B, you should just remove the read
>> from table B and modify table_A_records again directly. If that is not what
>> you want, you would need to explain more specifically what you want that is
>> different. Is it a pure UI change? Is it a functional change?
>>
>> -Daniel
>>
>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor 
>> wrote:
>>
>>> Team,
>>> Any update on this?
>>>
>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor 
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am currently using Beam in my project with Dataflow Runner.
>>>> I am trying to create a pipeline where the data flows from the
>>>> source to staging then to target such as:
>>>>
>>>> A (Source) -> B(Staging) -> C (Target)
>>>>
>>>> When I create a pipeline as below:
>>>>
>>>> PCollection table_A_records = p.apply(BigQueryIO.readTableRows()
>>>> .from("project:dataset.table_A"));
>>>>
>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>> to("project:dataset.table_B")
>>>> 
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>> 
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>
>>>> PCollection table_B_records = p.apply(BigQueryIO.readTableRows()
>>>> .from("project:dataset.table_B"));
>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>> to("project:dataset.table_C")
>>>> 
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>     
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>> p.run().waitUntilFinish();
>>>>
>>>>
>>>> It basically creates two parallel job graphs in dataflow instead
>>>> creating a transformation as expected:
>>>> A -> B
>>>> B -> C
>>>> I needed to create data pipeline which flows the data in chain like:
>>>>  D
>>>>/
>>>> A -> B -> C
>>>>   \
>>>> E
>>>> Is there a way to achieve this transformation in between source and
>>>> target tables?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorrav...@gmail.com
>>>
>>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564
> kapoorrav...@gmail.com
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Re: Chained Job Graph Apache Beam | Dataflow

2022-06-15 Thread Ravi Kapoor
Hi Daniel,

I have a use case where I join two tables say A and B and write the joined
Collection to C.
Then I would like to filter some records on C and put it to another table
say D.
So, the pipeline on Dataflow UI should look like this right?

A
   \
C -> D
   /
B

However, the pipeline is writing C -> D in parallel.
How can this pipeline run in parallel as data has not been pushed yet to C
by the previous pipeline?

Even when I ran this pipeline, Table D did not get any records inserted as
well which is apparent.
Can you help me with this use case?

Thanks,
Ravi



On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins  wrote:

> Can you speak to what specifically you want to be different? The job graph
> you see, with the A -> B and B-> C being separate is an accurate reflection
> of your pipeline. table_B is outside of the beam model, by pushing your
> data there, Dataflow has no ability to identify that no manipulation of
> data is happening at table_B.
>
> If you want to just process data from A to destinations D and E, while
> writing an intermediate output to table_B, you should just remove the read
> from table B and modify table_A_records again directly. If that is not what
> you want, you would need to explain more specifically what you want that is
> different. Is it a pure UI change? Is it a functional change?
>
> -Daniel
>
> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor 
> wrote:
>
>> Team,
>> Any update on this?
>>
>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am currently using Beam in my project with Dataflow Runner.
>>> I am trying to create a pipeline where the data flows from the source to
>>> staging then to target such as:
>>>
>>> A (Source) -> B(Staging) -> C (Target)
>>>
>>> When I create a pipeline as below:
>>>
>>> PCollection table_A_records = p.apply(BigQueryIO.readTableRows()
>>> .from("project:dataset.table_A"));
>>>
>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>> to("project:dataset.table_B")
>>> 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>
>>> PCollection table_B_records = p.apply(BigQueryIO.readTableRows()
>>> .from("project:dataset.table_B"));
>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>> to("project:dataset.table_C")
>>> 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>> p.run().waitUntilFinish();
>>>
>>>
>>> It basically creates two parallel job graphs in dataflow instead
>>> creating a transformation as expected:
>>> A -> B
>>> B -> C
>>> I needed to create data pipeline which flows the data in chain like:
>>>  D
>>>/
>>> A -> B -> C
>>>   \
>>> E
>>> Is there a way to achieve this transformation in between source and
>>> target tables?
>>>
>>> Thanks,
>>> Ravi
>>>
>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorrav...@gmail.com
>>
>

-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Re: Chained Job Graph Apache Beam | Dataflow

2022-06-14 Thread Ravi Kapoor
Team,
Any update on this?

On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor  wrote:

> Hi Team,
>
> I am currently using Beam in my project with Dataflow Runner.
> I am trying to create a pipeline where the data flows from the source to
> staging then to target such as:
>
> A (Source) -> B(Staging) -> C (Target)
>
> When I create a pipeline as below:
>
> PCollection table_A_records = p.apply(BigQueryIO.readTableRows()
> .from("project:dataset.table_A"));
>
> table_A_records.apply(BigQueryIO.writeTableRows().
> to("project:dataset.table_B")
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>
> PCollection table_B_records = p.apply(BigQueryIO.readTableRows()
> .from("project:dataset.table_B"));
> table_B_records.apply(BigQueryIO.writeTableRows().
> to("project:dataset.table_C")
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
> p.run().waitUntilFinish();
>
>
> It basically creates two parallel job graphs in dataflow instead creating
> a transformation as expected:
> A -> B
> B -> C
> I needed to create data pipeline which flows the data in chain like:
>  D
>/
> A -> B -> C
>   \
> E
> Is there a way to achieve this transformation in between source and target
> tables?
>
> Thanks,
> Ravi
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorrav...@gmail.com


Chained Job Graph Apache Beam | Dataflow

2022-06-13 Thread Ravi Kapoor
Hi Team,

I am currently using Beam in my project with Dataflow Runner.
I am trying to create a pipeline where the data flows from the source to
staging then to target such as:

A (Source) -> B(Staging) -> C (Target)

When I create a pipeline as below:

PCollection table_A_records = p.apply(BigQueryIO.readTableRows()
.from("project:dataset.table_A"));

table_A_records.apply(BigQueryIO.writeTableRows().
to("project:dataset.table_B")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

PCollection table_B_records = p.apply(BigQueryIO.readTableRows()
.from("project:dataset.table_B"));
table_B_records.apply(BigQueryIO.writeTableRows().
to("project:dataset.table_C")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();


It basically creates two parallel job graphs in dataflow instead creating a
transformation as expected:
A -> B
B -> C
I needed to create data pipeline which flows the data in chain like:
 D
   /
A -> B -> C
  \
E
Is there a way to achieve this transformation in between source and target
tables?

Thanks,
Ravi