It may also be helpful to explore CoGroupByKey as a way of joining data, though depending on the shape of the data doing so may not fit in mem: https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/
- Evan On Wed, Jun 15, 2022 at 3:45 PM Bruno Volpato <brunocvcu...@gmail.com> wrote: > Hello, > > I am not sure what is the context behind your join, but I just wanted to > point out that Beam SQL [1] or the Join-library extension [2] may be > helpful in your scenario to avoid changing semantics or the need to > orchestrate your jobs outside Beam. > > [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/ > [2] https://beam.apache.org/documentation/sdks/java-extensions/ > > > Best, > Bruno > > > > On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jrmcclus...@google.com> > wrote: > >> Hey Ravi, >> >> The problem you're running into is that the act of writing data to a >> table and reading from it are not joined actions in the Beam model. There's >> no connecting PCollection tying those together, so they are split and run >> in parallel. If you want to do this and need the data written to C, you >> should re-use the PCollection written to C in your filtering step instead >> of reading the data from C again. That should produce the graph you're >> looking for in a batch context. >> >> Thanks, >> >> Jack McCluskey >> >> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <kapoorrav...@gmail.com> >> wrote: >> >>> FYI >>> >>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <kapoorrav...@gmail.com> >>> wrote: >>> >>>> Hi Daniel, >>>> >>>> I have a use case where I join two tables say A and B and write the >>>> joined Collection to C. >>>> Then I would like to filter some records on C and put it to another >>>> table say D. >>>> So, the pipeline on Dataflow UI should look like this right? >>>> >>>> A >>>> \ >>>> C -> D >>>> / >>>> B >>>> >>>> However, the pipeline is writing C -> D in parallel. >>>> How can this pipeline run in parallel as data has not been pushed yet >>>> to C by the previous pipeline? >>>> >>>> Even when I ran this pipeline, Table D did not get any records inserted >>>> as well which is apparent. >>>> Can you help me with this use case? >>>> >>>> Thanks, >>>> Ravi >>>> >>>> >>>> >>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dpcoll...@google.com> >>>> wrote: >>>> >>>>> Can you speak to what specifically you want to be different? The job >>>>> graph you see, with the A -> B and B-> C being separate is an accurate >>>>> reflection of your pipeline. table_B is outside of the beam model, by >>>>> pushing your data there, Dataflow has no ability to identify that no >>>>> manipulation of data is happening at table_B. >>>>> >>>>> If you want to just process data from A to destinations D and E, while >>>>> writing an intermediate output to table_B, you should just remove the read >>>>> from table B and modify table_A_records again directly. If that is not >>>>> what >>>>> you want, you would need to explain more specifically what you want that >>>>> is >>>>> different. Is it a pure UI change? Is it a functional change? >>>>> >>>>> -Daniel >>>>> >>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <kapoorrav...@gmail.com> >>>>> wrote: >>>>> >>>>>> Team, >>>>>> Any update on this? >>>>>> >>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <kapoorrav...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> I am currently using Beam in my project with Dataflow Runner. >>>>>>> I am trying to create a pipeline where the data flows from the >>>>>>> source to staging then to target such as: >>>>>>> >>>>>>> A (Source) -> B(Staging) -> C (Target) >>>>>>> >>>>>>> When I create a pipeline as below: >>>>>>> >>>>>>> PCollection<TableRow> table_A_records = >>>>>>> p.apply(BigQueryIO.readTableRows() >>>>>>> .from("project:dataset.table_A")); >>>>>>> >>>>>>> table_A_records.apply(BigQueryIO.writeTableRows(). >>>>>>> to("project:dataset.table_B") >>>>>>> >>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>>>>> >>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>>>>> >>>>>>> PCollection<TableRow> table_B_records = >>>>>>> p.apply(BigQueryIO.readTableRows() >>>>>>> .from("project:dataset.table_B")); >>>>>>> table_B_records.apply(BigQueryIO.writeTableRows(). >>>>>>> to("project:dataset.table_C") >>>>>>> >>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>>>>>> >>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>>>>> p.run().waitUntilFinish(); >>>>>>> >>>>>>> >>>>>>> It basically creates two parallel job graphs in dataflow instead >>>>>>> creating a transformation as expected: >>>>>>> A -> B >>>>>>> B -> C >>>>>>> I needed to create data pipeline which flows the data in chain like: >>>>>>> D >>>>>>> / >>>>>>> A -> B -> C >>>>>>> \ >>>>>>> E >>>>>>> Is there a way to achieve this transformation in between source and >>>>>>> target tables? >>>>>>> >>>>>>> Thanks, >>>>>>> Ravi >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Ravi Kapoor >>>>>> +91-9818764564 <+91%2098187%2064564> >>>>>> kapoorrav...@gmail.com >>>>>> >>>>> >>>> >>>> -- >>>> Thanks, >>>> Ravi Kapoor >>>> +91-9818764564 <+91%2098187%2064564> >>>> kapoorrav...@gmail.com >>>> >>> >>> >>> -- >>> Thanks, >>> Ravi Kapoor >>> +91-9818764564 <+91%2098187%2064564> >>> kapoorrav...@gmail.com >>> >>