It may also be helpful to explore CoGroupByKey as a way of joining data,
though depending on the shape of the data doing so may not fit in mem:
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

- Evan

On Wed, Jun 15, 2022 at 3:45 PM Bruno Volpato <brunocvcu...@gmail.com>
wrote:

> Hello,
>
> I am not sure what is the context behind your join, but I just wanted to
> point out that Beam SQL [1] or the Join-library extension [2] may be
> helpful in your scenario to avoid changing semantics or the need to
> orchestrate your jobs outside Beam.
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
> [2] https://beam.apache.org/documentation/sdks/java-extensions/
>
>
> Best,
> Bruno
>
>
>
> On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jrmcclus...@google.com>
> wrote:
>
>> Hey Ravi,
>>
>> The problem you're running into is that the act of writing data to a
>> table and reading from it are not joined actions in the Beam model. There's
>> no connecting PCollection tying those together, so they are split and run
>> in parallel. If you want to do this and need the data written to C, you
>> should re-use the PCollection written to C in your filtering step instead
>> of reading the data from C again. That should produce the graph you're
>> looking for in a batch context.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <kapoorrav...@gmail.com>
>> wrote:
>>
>>> FYI
>>>
>>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <kapoorrav...@gmail.com>
>>> wrote:
>>>
>>>> Hi Daniel,
>>>>
>>>> I have a use case where I join two tables say A and B and write the
>>>> joined Collection to C.
>>>> Then I would like to filter some records on C and put it to another
>>>> table say D.
>>>> So, the pipeline on Dataflow UI should look like this right?
>>>>
>>>> A
>>>>    \
>>>>     C -> D
>>>>    /
>>>> B
>>>>
>>>> However, the pipeline is writing C -> D in parallel.
>>>> How can this pipeline run in parallel as data has not been pushed yet
>>>> to C by the previous pipeline?
>>>>
>>>> Even when I ran this pipeline, Table D did not get any records inserted
>>>> as well which is apparent.
>>>> Can you help me with this use case?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dpcoll...@google.com>
>>>> wrote:
>>>>
>>>>> Can you speak to what specifically you want to be different? The job
>>>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>>>> reflection of your pipeline. table_B is outside of the beam model, by
>>>>> pushing your data there, Dataflow has no ability to identify that no
>>>>> manipulation of data is happening at table_B.
>>>>>
>>>>> If you want to just process data from A to destinations D and E, while
>>>>> writing an intermediate output to table_B, you should just remove the read
>>>>> from table B and modify table_A_records again directly. If that is not 
>>>>> what
>>>>> you want, you would need to explain more specifically what you want that 
>>>>> is
>>>>> different. Is it a pure UI change? Is it a functional change?
>>>>>
>>>>> -Daniel
>>>>>
>>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <kapoorrav...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Team,
>>>>>> Any update on this?
>>>>>>
>>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <kapoorrav...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>>>> I am trying to create a pipeline where the data flows from the
>>>>>>> source to staging then to target such as:
>>>>>>>
>>>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>>>
>>>>>>> When I create a pipeline as below:
>>>>>>>
>>>>>>> PCollection<TableRow> table_A_records = 
>>>>>>> p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_A"));
>>>>>>>
>>>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_B")
>>>>>>>         
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>>
>>>>>>> PCollection<TableRow> table_B_records = 
>>>>>>> p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_B"));
>>>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_C")
>>>>>>>         
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>> p.run().waitUntilFinish();
>>>>>>>
>>>>>>>
>>>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>>>> creating a transformation as expected:
>>>>>>> A -> B
>>>>>>> B -> C
>>>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>>>                      D
>>>>>>>                    /
>>>>>>> A -> B -> C
>>>>>>>                   \
>>>>>>>                     E
>>>>>>> Is there a way to achieve this transformation in between source and
>>>>>>> target tables?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Ravi Kapoor
>>>>>> +91-9818764564 <+91%2098187%2064564>
>>>>>> kapoorrav...@gmail.com
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ravi Kapoor
>>>> +91-9818764564 <+91%2098187%2064564>
>>>> kapoorrav...@gmail.com
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorrav...@gmail.com
>>>
>>

Reply via email to