Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-21 Thread Reuven Lax via user
I believe you have to call withResults() on the JdbcIO transform in order
for this to work.

On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar  wrote:

> I hope you all are doing well. I am facing an issue with an Apache Beam
> pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
>
> PCollection result = p.
> apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> .apply("Transform", ParDo.of(new MyTransformer()));
>
> PCollection insert = result.apply("Inserting",
> JdbcIO.writeVoid()
> .withDataSourceProviderFn(/*...*/)
> .withStatement(/*...*/)
> .withPreparedStatementSetter(/*...*/)
> );
>
> result.apply(Wait.on(insert))
> .apply("Selecting", new SomeTransform())
> .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> p.run();
>
> In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
>
> I've tried adding logging messages in my transforms to track the progress
> and identify where it's getting stuck, but I haven't been able to pinpoint
> the issue. I've searched for solutions online, but none of them provided a
> successful resolution for my problem.
>
> Can anyone provide any insights or suggestions on how to debug and resolve
> this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
>
> You can find the sample code at: https://github.com/j1cs/app-beam
>
> Thank you for your help and support.
>
> Best regards,
>
> Juan Cuzmar.
>


Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-21 Thread Juan Cuzmar
I hope you all are doing well. I am facing an issue with an Apache Beam 
pipeline that gets stuck indefinitely when using the Wait.on transform 
alongside JdbcIO. Here's a simplified version of my code, focusing on the 
relevant parts:

PCollection result = p.
apply("Pubsub", 
PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
.apply("Transform", ParDo.of(new MyTransformer()));

PCollection insert = result.apply("Inserting",
JdbcIO.writeVoid()
.withDataSourceProviderFn(/*...*/)
.withStatement(/*...*/)
.withPreparedStatementSetter(/*...*/)
);

result.apply(Wait.on(insert))
.apply("Selecting", new SomeTransform())
.apply("PubsubMessaging", ParDo.of(new NextTransformer()));
p.run();

In the code, I'm using the Wait.on transform to make the pipeline wait until 
the insert transform (which uses JdbcIO to write data) is completed before 
executing the next steps. However, the pipeline gets stuck and doesn't progress 
further.

I've tried adding logging messages in my transforms to track the progress and 
identify where it's getting stuck, but I haven't been able to pinpoint the 
issue. I've searched for solutions online, but none of them provided a 
successful resolution for my problem.

Can anyone provide any insights or suggestions on how to debug and resolve this 
issue involving Wait.on and JdbcIO in my Apache Beam pipeline?

You can find the sample code at: https://github.com/j1cs/app-beam

Thank you for your help and support.

Best regards,

Juan Cuzmar.


Re: apache beam bigquery IO connector support for bigquery external tables

2023-04-21 Thread Brian Hulette via user
Hi Nirav,
BQ external tables are read-only, so you won't be able to write this way. I
also don't think reading a standard external table will work since the Read
API and tabledata.list are not supported for external tables [1].

BigLake tables [2] on the other hand, may "just work". I haven't
checked this though, and it would still be read-only.

Brian

[1] https://cloud.google.com/bigquery/docs/external-tables#limitations
[2] https://cloud.google.com/bigquery/docs/biglake-intro

On Tue, Apr 18, 2023 at 9:33 AM Nirav Patel  wrote:

> Beam has bigquery IO support to be able to read and write to bq tables. I
> am assuming it only supports bigquery internal tables however if anyone
> knows if it supports writing to and reading from bq external tables
> (parquet or Iceberg) ? I think reading should be possible but haven't tried
> it myself.
>


Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Evan Galpin
Oops, I was looking at the "bootleg" mvnrepository search engine, which
shows `compileOnly` in the copy-pastable dependency installation
prompts[1].  When I received the "ClassNotFound" error, my thought was that
the dep should be installed in "implementation" mode.  When I tried that, I
get other more strange errors when I try to run my pipeline:
"java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.coders.CoderRegistry".

My deps are like so:
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
implementation
"org.apache.beam:beam-sdks-java-io-parquet:${beamVersion}"
...

Not sure why the CoderRegistry error comes up at runtime when both of the
above deps are included.

[1]
https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0

On Fri, Apr 21, 2023 at 2:34 AM Alexey Romanenko 
wrote:

> Just curious. where it was documented like this?
>
> I briefly checked it on Maven Central [1] and the provided code snippet
> for Gradle uses “implementation” scope.
>
> —
> Alexey
>
> [1]
> https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar
>
> > On 21 Apr 2023, at 01:52, Evan Galpin  wrote:
> >
> > Hi all,
> >
> > I'm trying to make use of ParquetIO.  Based on what's documented in
> maven central, I'm including the artifact in "compileOnly" mode (or in
> maven parlance, 'provided' scope).  I can successfully compile my pipeline,
> but when I run it I (intuitively?) am met with a ClassNotFound exception
> for ParquetIO.
> >
> > Is 'compileOnly' still the desired way to include ParquetIO as a
> pipeline dependency?
> >
> > Thanks,
> > Evan
>
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Ning Kang via user
Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+d...@beam.apache.org 
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:
>>>
 Jeff - does setting the global default work for you, or do you need
 per-operator control? Seems like it would be to add this to ResourceHints.

 On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
 wrote:

> Yeah, I don't think we have a good per-operator API for this. If we
> were to add it, it probably belongs in ResourceHints.
>

Re: How Beam Pipeline Handle late events

2023-04-21 Thread Pavel Solomin
Thank you for the information.

I'm assuming you had a unique ID in records, and you observed some IDs
missing in Beam output comparing with Spark, and not just some duplicates
produced by Spark.

If so, I would suggest to create a P1 issue at
https://github.com/apache/beam/issues

Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?

Unfortunately, I can't be more helpful here, but let me share some of the
gotchas I had from my previous experience of running Beam on top of Flink
for similar use-case (landing of data from messaging system into files):

(1) https://github.com/apache/beam/issues/26041 - I've solved that by
adding a runId into file names which is re-generated between app (re) starts

(2) I used processing time watermarks and simple window without lateness
set up - combining it with (1) achieved no data loss

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin






On Thu, 20 Apr 2023 at 02:18, Lydian  wrote:

> Yes, we did enabled this in our pipeline.
>
> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin 
> wrote:
>
>> Thank you
>>
>> Just to confirm: how did you configure Kafka offset commits? Did you have
>> this flag enabled?
>>
>>
>>
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>
>>
>> On Thursday, 20 April 2023, Trevor Burke  wrote:
>> > Hi Pavel,
>> > Thanks for the reply.
>> > No, the event losses are not consistent. While we've been running our
>> pipelines in parallel (Beam vs Spark) we are seeing some days with no event
>> loss and some days with some, but it's always less than 0.05%
>> >
>> >
>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin 
>> wrote:
>> >>
>> >> Hello Lydian,
>> >> Do you always observe data loss? Or - maybe, it happens only when you
>> restart your pipeline from a Flink savepoint? If you lose data only between
>> restarts - is you issue similar to
>> https://github.com/apache/beam/issues/26041 ?
>> >>
>> >> Best Regards,
>> >> Pavel Solomin
>> >>
>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> >>
>> >>
>> >>
>> >>
>> >> On Tue, 18 Apr 2023 at 18:58, Lydian  wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming
>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute
>> fixed window to group messages.  We've had similar pipeline in spark that
>> we want to replace it with this new pipeline.  However, the Beam pipeline
>> seems always having events missing, which we are thinking could be due to
>> late events (because the number of missing events get lower when having
>> higher allow_lateness)
>> >>>
>> >>> We've tried the following approach to avoid late events, but none of
>> them are working:
>> >>> 1.  Use Processing timestamp instead of event time. Ideally if
>> windowing is using the processing timestamp, It shouldn't consider any
>> event as late. But this doesn't seem to work at all.
>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems
>> not working as expected, we've also configured the allow_lateness. But it
>> still have missing events compared to our old spark pipelines.
>> >>>
>> >>> Here's the simplified code we have
>> >>> ```
>> >>>
>> >>> def add_timestamp(event: Any) -> Any:
>> >>>
>> >>> import time
>> >>>
>> >>> from apache_beam import window
>> >>>
>> >>> return window.TimestampedValue(event, time.time())
>> >>>
>> >>> (pipeline
>> >>>
>> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic",
>> consumer_config=consumer_config)
>> >>>
>> >>> | "Adding 'trigger_processing_time' timestamp" >>
>> beam.Map(add_timestamp)
>> >>>
>> >>> | "Window into Fixed Intervals"
>> >>>
>> >>> >> beam.WindowInto(
>> >>>
>> >>> beam.window.FixedWindows(fixed_window_size),
>> >>>
>> >>>
>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>> >>>
>> >>> )
>> >>>
>> >>> |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>> >>>
>> >>> ```
>> >>>
>> >>> I am wondering:
>> >>> 1. Is the add_timestamp approach correctly marked it to use
>> processing time for windowing?  If so, why there still late event consider
>> we are using processing time and not event time?
>> >>> 2.  Are there are any other approaches to avoid dropping any late
>> event besides ` allowed_lateness`?  In flink you can output those late
>> events as side output, wondering if we can do similar thing in Beam as
>> well? Would someone provide some code example?
>> >>>
>> >>> Could someone help us debugging this?  Thanks!
>> >>>
>> >>> ---
>> >>> * Flink's documentation about late event as side output:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>> >>>
>> >>>
>> >>> Sincerely,
>> >>> Lydian Lee
>> >
>> >
>> > --
>> > Trevor Burke (he/him)   |   Software Engineer, Dat

Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Alexey Romanenko
Just curious. where it was documented like this?

I briefly checked it on Maven Central [1] and the provided code snippet for 
Gradle uses “implementation” scope.

—
Alexey

[1] 
https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar

> On 21 Apr 2023, at 01:52, Evan Galpin  wrote:
> 
> Hi all,
> 
> I'm trying to make use of ParquetIO.  Based on what's documented in maven 
> central, I'm including the artifact in "compileOnly" mode (or in maven 
> parlance, 'provided' scope).  I can successfully compile my pipeline, but 
> when I run it I (intuitively?) am met with a ClassNotFound exception for 
> ParquetIO.
> 
> Is 'compileOnly' still the desired way to include ParquetIO as a pipeline 
> dependency? 
> 
> Thanks,
> Evan



Re: Can I batch data when i use JDBC write operation?

2023-04-21 Thread Wiśniowski Piotr

Hi,

Not tested, but few options that might be a solutions for You problem:

1. go with having read and write replicas of Your DB - so that write 
replica would get inserts one by one and live with this. Make sure to 
deduplicate the data before insert to avoid potential collisions (this 
should not be a problem, but I am not sure how the subsystem would behave)


2.

- Add a step to group the input data into a time window

- then ingest the events from a window to a unique temp table (just by 
plain `INSERT INTO`)


- then add next step in pipeline to trigger merge operation from tmp 
table to You production table. Make sure same connection session is used 
or the tmp table will be gone. Also not sure if it is possible to invoke 
only one command per window by using `WriteToJdbc` operator after 
previous write finishes. But this is up for your experimentation/ anyone 
with more experience has some knowledge how to code it?


3. Another option is to

- aggregate events into a window - so that only one element would be 
emited a window (array of events)


- try somehow to upt this record in statement as a single row

- in subsequent CTEs deserialize the array into multiple rows

- do insert with update.

So the sql on the DB side would look something like:

```

|WITH new_values (arr) as ( values (?) ), deser AS ( SELECT explode(arr) 
FROM new_values ), ||upsert as ( update mytable m set field1 = nv.field1, field2 = nv.field2 
FROM |||deser| nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO mytable (id, 
field1, field2) SELECT id, field1, field2 FROM |||deser| WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = 
new_values.id)|


```

Above is just pseudocode that I did not test, but it could be a hint for 
You. Also great answer on this one here: 
https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291


4. some mix of p.2 and p.3

Hopefully this helps You in breaking the problem.

Best regards

Wiśniowski Piotr

On 21.04.2023 01:34, Juan Romero wrote:

Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero () 
escribió:


Hi community.

On this occasion I have a doubt regarding how to read a stream
from kafka and write batches of data with the jdbc connector. The
idea is to override a specific row if the current row we want to
insert into has the same id and the load_date_time is greater. The
conceptual pipeline look like this and it is working (Take in mind
that the source will be a streaming from kafka):

ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, 
load_date_time=str)
with beam.Pipeline()as p:
   _ = (
   p
   | beam.Create(
 [

 ExampleRow(1, '', '2023-04-05 12:34:56'), ExampleRow(1, 
'yyyz', '2023-04-05 12:34:55')
 ]).with_output_types(ExampleRow)
   |'Write to jdbc' >> WriteToJdbc(
   driver_class_name='org.postgresql.Driver', 
jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', 
password='postgres', table_name='test', 
connection_properties="stringtype=unspecified", statement='INSERT INTO test \ 
VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE
SET name = EXCLUDED.name, load_date_time =
EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp
> test.load_date_time::timestamp', ))

My question is if I want to write a stream that comes from kafka
how can how can avoid the jdbc connector inserting the register
one by one statement and rather insert the data in based time
batches. Probably internally jdbc has some kind of "intelligence
for do this" but i want to know what do you think about it  .

Thank you!


Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Wiśniowski Piotr

Hi Evan,

Just to have full knowledge:

- "provided" should be used when You expect the target cluster on 
environment to have the package of interest installed so you do not have 
to include it in the pipeline jar (this is to have it more lightweight 
and easier to maintain coherent target jre env across organization).


- it seems that You should either install the library on You target env 
or include it in your build jar. Up to Your specific use case. Typically 
corporation envs provide commonly used libs in their envs like spark, 
and IO libs - and this might be the reason that maven suggest this.


Best

Wiśniowski Piotr

On 21.04.2023 08:30, Moritz Mack wrote:


Hi Evan,

Not sure why maven suggests using “compileOnly”.

That’s certainly wrong, make sure to use “implementation” in your case.

Cheers, Moritz

On 21.04.23, 01:52, "Evan Galpin"  wrote:

Hi all, I'm trying to make use of ParquetIO.   Based on what's 
documented in maven central, I'm including the artifact in 
"compileOnly" mode (or in maven parlance, 'provided' scope).   I can 
successfully compile


Hi all,

I'm trying to make use of ParquetIO.  Based on what's documented in 
maven central, I'm including the artifact in "compileOnly" mode (or in 
maven parlance, 'provided' scope).  I can successfully compile my 
pipeline, but when I run it I (intuitively?) am met with a 
ClassNotFound exception for ParquetIO.


Is 'compileOnly' still the desired way to include ParquetIO as a 
pipeline dependency?


Thanks,

Evan


  *As a recipient of an email from the Talend Group, your personal
  data will be processed by our systems. Please see our Privacy Notice
  *for more information about
  our collection and use of your personal information, our security
  practices, and your data protection rights, including any rights you
  may have to object to automated-decision making or profiling we use
  to analyze support or marketing related communications. To manage or
  discontinue promotional communications, use the communication
  preferences portal
  . To exercise your
  data protection rights, use the privacy request form
  
.
  Contact us here or by mail to
  either of our co-headquarters: Talend, Inc.: 400 South El Camino
  Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De
  Rothschild, 92150 Suresnes, France