Re: Staging a PCollection in Beam | Dataflow Runner
On Wed, Oct 19, 2022 at 2:43 PM Ravi Kapoor wrote: > I am talking about in batch context. Can we do checkpointing in batch mode > as well? > I am *not* looking for any failure or retry algorithm. > The requirement is to simply materialize a PCollection which can be used > across the jobs /within the job in some view/temp table which is > auto deleted > I believe Reshuffle > <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html> > is > for streaming. Right? > > Thanks, > Ravi > > On Wed, Oct 19, 2022 at 1:32 PM Israel Herraiz via dev < > dev@beam.apache.org> wrote: > >> I think that would be a Reshuffle >> <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>, >> but only within the context of the same job (e.g. if there is a failure and >> a retry, the retry would start from the checkpoint created by the >> reshuffle). In Dataflow, a group by key, a combiner per key, cogroup by >> key, stateful dofns and I think splittable dofns will also have the same >> effect of creating a checkpoint (any shuffling operation will always create >> a checkpoint). >> >> If you want to start a different job (slightly updated code, starting >> from a previous point of a previous job), in Dataflow that would be a >> snapshot <https://cloud.google.com/dataflow/docs/guides/using-snapshots>, >> I think. Snapshots only work in streaming pipelines. >> >> On Wed, 19 Oct 2022 at 08:45, Ravi Kapoor wrote: >> >>> Hi Team, >>> Can we stage a PCollection or PCollection data? Lets >>> say to save the expensive operations between two complex BQ tables time >>> and again and materialize it in some temp view which will be deleted after >>> the session. >>> >>> Is it possible to do that in the Beam Pipeline? >>> We can later use the temp view in another pipeline to read the data from >>> and do processing. >>> >>> Or In general I would like to know Do we ever stage the PCollection. >>> Let's say I want to create another instance of the same job which has >>> complex processing. >>> Does the pipeline re perform the computation or would it pick the >>> already processed data in the previous instance that must be staged >>> somewhere? >>> >>> Like in spark we do have notions of createOrReplaceTempView which is >>> used to create temp table from a spark dataframe or dataset. >>> >>> Please advise. >>> >>> -- >>> Thanks, >>> Ravi Kapoor >>> +91-9818764564 <+91%2098187%2064564> >>> kapoorrav...@gmail.com >>> >> > > -- > Thanks, > Ravi Kapoor > +91-9818764564 > kapoorrav...@gmail.com > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Re: Staging a PCollection in Beam | Dataflow Runner
I am talking about in batch context. Can we do checkpointing in batch mode as well? I am looking for any failure or retry algorithm. The requirement is to simply materialize a PCollection which can be used across the jobs /within the job in some view/temp table which is auto deleted I believe Reshuffle <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html> is for streaming. Right? Thanks, Ravi On Wed, Oct 19, 2022 at 1:32 PM Israel Herraiz via dev wrote: > I think that would be a Reshuffle > <https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html>, > but only within the context of the same job (e.g. if there is a failure and > a retry, the retry would start from the checkpoint created by the > reshuffle). In Dataflow, a group by key, a combiner per key, cogroup by > key, stateful dofns and I think splittable dofns will also have the same > effect of creating a checkpoint (any shuffling operation will always create > a checkpoint). > > If you want to start a different job (slightly updated code, starting from > a previous point of a previous job), in Dataflow that would be a snapshot > <https://cloud.google.com/dataflow/docs/guides/using-snapshots>, I think. > Snapshots only work in streaming pipelines. > > On Wed, 19 Oct 2022 at 08:45, Ravi Kapoor wrote: > >> Hi Team, >> Can we stage a PCollection or PCollection data? Lets say >> to save the expensive operations between two complex BQ tables time and >> again and materialize it in some temp view which will be deleted after the >> session. >> >> Is it possible to do that in the Beam Pipeline? >> We can later use the temp view in another pipeline to read the data from >> and do processing. >> >> Or In general I would like to know Do we ever stage the PCollection. >> Let's say I want to create another instance of the same job which has >> complex processing. >> Does the pipeline re perform the computation or would it pick the already >> processed data in the previous instance that must be staged somewhere? >> >> Like in spark we do have notions of createOrReplaceTempView which is used >> to create temp table from a spark dataframe or dataset. >> >> Please advise. >> >> -- >> Thanks, >> Ravi Kapoor >> +91-9818764564 <+91%2098187%2064564> >> kapoorrav...@gmail.com >> > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Staging a PCollection in Beam | Dataflow Runner
Hi Team, Can we stage a PCollection or PCollection data? Lets say to save the expensive operations between two complex BQ tables time and again and materialize it in some temp view which will be deleted after the session. Is it possible to do that in the Beam Pipeline? We can later use the temp view in another pipeline to read the data from and do processing. Or In general I would like to know Do we ever stage the PCollection. Let's say I want to create another instance of the same job which has complex processing. Does the pipeline re perform the computation or would it pick the already processed data in the previous instance that must be staged somewhere? Like in spark we do have notions of createOrReplaceTempView which is used to create temp table from a spark dataframe or dataset. Please advise. -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Re: Write data to Jdbc from BigQueryIO | Issue
Hello Team, Has anyone aware of this problem? Thanks, Ravi On Sun, Jun 19, 2022 at 10:59 PM Ravi Kapoor wrote: > Hi Team > > I am trying writing a PCollection from BQ with Schema as > > > *final Schema schema = > Schema.builder().addInt64Field("user_id").addStringField("user_name").build();* > to a JDBC datasource (oracle) > having table schema as below on Oracle : > > Table_A ( user_id NUMBER(3), user_name varchar(10)) > The code flow is such that this will invoke > > > *PCollection expand(PCollection input)*and internally this will > call > > > *List fields = > spec.getFilteredFields(input.getSchema());*which converts the > resultsetmetadata to BeamSchema and for the numeric type this is what is set > *LogicalTypes.FixedPrecisionNumeric.of* as FieldType in > jdbcTypeToBeamFieldConverter method which has baseType as Decimal > > > *SchemaUtil.compareSchemaField(tableField, f)*which subsequently compare > schemaFieldType with below method: > > > *static boolean compareSchemaFieldType(Schema.FieldType a, > Schema.FieldType b) {*The below code returns false: > > *return > a.getLogicalType().getBaseType().getTypeName().equals(b.getTypeName());* > as the base type is set as DECIMAL for the user_id field in oracle > datasource. > > And it eventually fails with > > *"Provided schema doesn't match with database schema. " + " Table has > fields: ",* > Can you please check whether it's a bug in matching BQ data type to Oracle? > Or do I require different handling in writing to Jdbc source? > > -- > Thanks, > Ravi Kapoor > +91-9818764564 > kapoorrav...@gmail.com > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Write data to Jdbc from BigQueryIO | Issue
Hi Team I am trying writing a PCollection from BQ with Schema as *final Schema schema = Schema.builder().addInt64Field("user_id").addStringField("user_name").build();* to a JDBC datasource (oracle) having table schema as below on Oracle : Table_A ( user_id NUMBER(3), user_name varchar(10)) The code flow is such that this will invoke *PCollection expand(PCollection input)*and internally this will call *List fields = spec.getFilteredFields(input.getSchema());*which converts the resultsetmetadata to BeamSchema and for the numeric type this is what is set *LogicalTypes.FixedPrecisionNumeric.of* as FieldType in jdbcTypeToBeamFieldConverter method which has baseType as Decimal *SchemaUtil.compareSchemaField(tableField, f)*which subsequently compare schemaFieldType with below method: *static boolean compareSchemaFieldType(Schema.FieldType a, Schema.FieldType b) {*The below code returns false: *return a.getLogicalType().getBaseType().getTypeName().equals(b.getTypeName());* as the base type is set as DECIMAL for the user_id field in oracle datasource. And it eventually fails with *"Provided schema doesn't match with database schema. " + " Table has fields: ",* Can you please check whether it's a bug in matching BQ data type to Oracle? Or do I require different handling in writing to Jdbc source? -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Re: Chained Job Graph Apache Beam | Dataflow
FYI On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor wrote: > Hi Daniel, > > I have a use case where I join two tables say A and B and write the joined > Collection to C. > Then I would like to filter some records on C and put it to another table > say D. > So, the pipeline on Dataflow UI should look like this right? > > A >\ > C -> D >/ > B > > However, the pipeline is writing C -> D in parallel. > How can this pipeline run in parallel as data has not been pushed yet to C > by the previous pipeline? > > Even when I ran this pipeline, Table D did not get any records inserted as > well which is apparent. > Can you help me with this use case? > > Thanks, > Ravi > > > > On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins > wrote: > >> Can you speak to what specifically you want to be different? The job >> graph you see, with the A -> B and B-> C being separate is an accurate >> reflection of your pipeline. table_B is outside of the beam model, by >> pushing your data there, Dataflow has no ability to identify that no >> manipulation of data is happening at table_B. >> >> If you want to just process data from A to destinations D and E, while >> writing an intermediate output to table_B, you should just remove the read >> from table B and modify table_A_records again directly. If that is not what >> you want, you would need to explain more specifically what you want that is >> different. Is it a pure UI change? Is it a functional change? >> >> -Daniel >> >> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor >> wrote: >> >>> Team, >>> Any update on this? >>> >>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor >>> wrote: >>> >>>> Hi Team, >>>> >>>> I am currently using Beam in my project with Dataflow Runner. >>>> I am trying to create a pipeline where the data flows from the >>>> source to staging then to target such as: >>>> >>>> A (Source) -> B(Staging) -> C (Target) >>>> >>>> When I create a pipeline as below: >>>> >>>> PCollection table_A_records = p.apply(BigQueryIO.readTableRows() >>>> .from("project:dataset.table_A")); >>>> >>>> table_A_records.apply(BigQueryIO.writeTableRows(). >>>> to("project:dataset.table_B") >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>> >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>> >>>> PCollection table_B_records = p.apply(BigQueryIO.readTableRows() >>>> .from("project:dataset.table_B")); >>>> table_B_records.apply(BigQueryIO.writeTableRows(). >>>> to("project:dataset.table_C") >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>> >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>> p.run().waitUntilFinish(); >>>> >>>> >>>> It basically creates two parallel job graphs in dataflow instead >>>> creating a transformation as expected: >>>> A -> B >>>> B -> C >>>> I needed to create data pipeline which flows the data in chain like: >>>> D >>>>/ >>>> A -> B -> C >>>> \ >>>> E >>>> Is there a way to achieve this transformation in between source and >>>> target tables? >>>> >>>> Thanks, >>>> Ravi >>>> >>> >>> >>> -- >>> Thanks, >>> Ravi Kapoor >>> +91-9818764564 <+91%2098187%2064564> >>> kapoorrav...@gmail.com >>> >> > > -- > Thanks, > Ravi Kapoor > +91-9818764564 > kapoorrav...@gmail.com > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Re: Chained Job Graph Apache Beam | Dataflow
Hi Daniel, I have a use case where I join two tables say A and B and write the joined Collection to C. Then I would like to filter some records on C and put it to another table say D. So, the pipeline on Dataflow UI should look like this right? A \ C -> D / B However, the pipeline is writing C -> D in parallel. How can this pipeline run in parallel as data has not been pushed yet to C by the previous pipeline? Even when I ran this pipeline, Table D did not get any records inserted as well which is apparent. Can you help me with this use case? Thanks, Ravi On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins wrote: > Can you speak to what specifically you want to be different? The job graph > you see, with the A -> B and B-> C being separate is an accurate reflection > of your pipeline. table_B is outside of the beam model, by pushing your > data there, Dataflow has no ability to identify that no manipulation of > data is happening at table_B. > > If you want to just process data from A to destinations D and E, while > writing an intermediate output to table_B, you should just remove the read > from table B and modify table_A_records again directly. If that is not what > you want, you would need to explain more specifically what you want that is > different. Is it a pure UI change? Is it a functional change? > > -Daniel > > On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor > wrote: > >> Team, >> Any update on this? >> >> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor >> wrote: >> >>> Hi Team, >>> >>> I am currently using Beam in my project with Dataflow Runner. >>> I am trying to create a pipeline where the data flows from the source to >>> staging then to target such as: >>> >>> A (Source) -> B(Staging) -> C (Target) >>> >>> When I create a pipeline as below: >>> >>> PCollection table_A_records = p.apply(BigQueryIO.readTableRows() >>> .from("project:dataset.table_A")); >>> >>> table_A_records.apply(BigQueryIO.writeTableRows(). >>> to("project:dataset.table_B") >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>> >>> PCollection table_B_records = p.apply(BigQueryIO.readTableRows() >>> .from("project:dataset.table_B")); >>> table_B_records.apply(BigQueryIO.writeTableRows(). >>> to("project:dataset.table_C") >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>> p.run().waitUntilFinish(); >>> >>> >>> It basically creates two parallel job graphs in dataflow instead >>> creating a transformation as expected: >>> A -> B >>> B -> C >>> I needed to create data pipeline which flows the data in chain like: >>> D >>>/ >>> A -> B -> C >>> \ >>> E >>> Is there a way to achieve this transformation in between source and >>> target tables? >>> >>> Thanks, >>> Ravi >>> >> >> >> -- >> Thanks, >> Ravi Kapoor >> +91-9818764564 <+91%2098187%2064564> >> kapoorrav...@gmail.com >> > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Re: Chained Job Graph Apache Beam | Dataflow
Team, Any update on this? On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor wrote: > Hi Team, > > I am currently using Beam in my project with Dataflow Runner. > I am trying to create a pipeline where the data flows from the source to > staging then to target such as: > > A (Source) -> B(Staging) -> C (Target) > > When I create a pipeline as below: > > PCollection table_A_records = p.apply(BigQueryIO.readTableRows() > .from("project:dataset.table_A")); > > table_A_records.apply(BigQueryIO.writeTableRows(). > to("project:dataset.table_B") > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); > > PCollection table_B_records = p.apply(BigQueryIO.readTableRows() > .from("project:dataset.table_B")); > table_B_records.apply(BigQueryIO.writeTableRows(). > to("project:dataset.table_C") > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) > > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); > p.run().waitUntilFinish(); > > > It basically creates two parallel job graphs in dataflow instead creating > a transformation as expected: > A -> B > B -> C > I needed to create data pipeline which flows the data in chain like: > D >/ > A -> B -> C > \ > E > Is there a way to achieve this transformation in between source and target > tables? > > Thanks, > Ravi > -- Thanks, Ravi Kapoor +91-9818764564 kapoorrav...@gmail.com
Chained Job Graph Apache Beam | Dataflow
Hi Team, I am currently using Beam in my project with Dataflow Runner. I am trying to create a pipeline where the data flows from the source to staging then to target such as: A (Source) -> B(Staging) -> C (Target) When I create a pipeline as below: PCollection table_A_records = p.apply(BigQueryIO.readTableRows() .from("project:dataset.table_A")); table_A_records.apply(BigQueryIO.writeTableRows(). to("project:dataset.table_B") .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); PCollection table_B_records = p.apply(BigQueryIO.readTableRows() .from("project:dataset.table_B")); table_B_records.apply(BigQueryIO.writeTableRows(). to("project:dataset.table_C") .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); p.run().waitUntilFinish(); It basically creates two parallel job graphs in dataflow instead creating a transformation as expected: A -> B B -> C I needed to create data pipeline which flows the data in chain like: D / A -> B -> C \ E Is there a way to achieve this transformation in between source and target tables? Thanks, Ravi