Hi. Can someone help me with this?
El mié, 19 abr 2023 a las 15:08, Juan Romero (<[email protected]>) escribió:
> Hi community.
>
> On this occasion I have a doubt regarding how to read a stream from kafka
> and write batches of data with the jdbc connector. The idea is to override
> a specific row if the current row we want to insert into has the same id
> and the load_date_time is greater. The conceptual pipeline look like this
> and it is working (Take in mind that the source will be a streaming from
> kafka):
>
> ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str,
> load_date_time=str)
>
>
> with beam.Pipeline() as p:
> _ = (
> p
> | beam.Create(
> [
>
> ExampleRow(1, 'zzzz', '2023-04-05 12:34:56'),
> ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
> ]).with_output_types(ExampleRow)
> | 'Write to jdbc' >> WriteToJdbc(
> driver_class_name='org.postgresql.Driver',
> jdbc_url='jdbc:postgresql://localhost:5432/postgres',
> username='postgres',
> password='postgres',
> table_name= 'test',
> connection_properties="stringtype=unspecified",
> statement= 'INSERT INTO test \
> VALUES(?,?,?) \
> ON CONFLICT (id)\
> DO UPDATE SET name = EXCLUDED.name, load_date_time =
> EXCLUDED.load_date_time\
> WHERE EXCLUDED.load_date_time::timestamp >
> test.load_date_time::timestamp',
> ))
>
> My question is if I want to write a stream that comes from kafka how can
> how can avoid the jdbc connector inserting the register one by one
> statement and rather insert the data in based time batches. Probably
> internally jdbc has some kind of "intelligence for do this" but i want to
> know what do you think about it .
>
> Thank you!
>