Oh - in that case it's possible that the problem may be the direct runner's
implementation of the pubsub source - especially the watermark. For a
direct-runner test, I recommend using TestStream (which allows you to
advance the watermark manually, so you can test windowing).

On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote:

> I'm developing with direct runner. but should go to dataflow when
> deployed.
>
>
> -------- Original Message --------
> On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote:
>
>
> What runner are you using to run this pipeline?
>
> On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com>
> wrote:
>
>> Same result:
>> PCollection<String> result = p
>>                 .apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>>                 .apply("Transform", ParDo.of(new MyTransformer()))
>>                 .apply("Windowing",
>> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
>>                         .triggering(AfterWatermark.pastEndOfWindow()
>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
>>                         .withAllowedLateness(Duration.standardMinutes(1))
>>                         .discardingFiredPanes());
>>
>>         PCollection<Void> insert = result.apply("Inserting",
>>                 JdbcIO.<String>write()
>>
>> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>>                         .withStatement("INSERT INTO person (first_name,
>> last_name) VALUES (?, 'doe')")
>>                         .withPreparedStatementSetter((element,
>> preparedStatement) -> {
>>                             log.info("Preparing statement to insert");
>>                             preparedStatement.setString(1, element);
>>                         })
>>                         .withResults()
>>         );
>>         result.apply(Wait.on(insert))
>>                 .apply("Selecting", new SomeTransform())
>>                 .apply("PubsubMessaging", ParDo.of(new
>> NextTransformer()));
>>         p.run();
>>
>> updated the github repo as wqell.
>>
>> ------- Original Message -------
>> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>>
>>
>> > The other problem you have here is that you have not set a window.
>> Wait.on waits for the end of the current window before triggering. The
>> default Window is the GlobalWindow, so as written Wait.on will wait for the
>> end of time (or until you drain the pipeline, which will also trigger the
>> GlobalWindow).
>> > Try adding a 1-minute fixed window to the results you read from PubSub.
>> >
>> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com>
>> wrote:
>> >
>> > > writeVoid() and write() plus withResults() return the same
>> PCollection<Void> AFAIK. In any case i updated the code and same thing
>> happens
>> > >
>> > > PCollection<String> result = p.
>> > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
>> options.getProjectId(), subscription)))
>> > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > >
>> > > PCollection<Void> insert = result.apply("Inserting",
>> > > JdbcIO.<String>write()
>> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?,
>> 'doe')")
>> > > .withPreparedStatementSetter((element, preparedStatement) -> {
>> > > log.info("Preparing statement to insert");
>> > > preparedStatement.setString(1, element);
>> > > })
>> > > .withResults()
>> > > );
>> > > result.apply(Wait.on(insert))
>> > > .apply("Selecting", new SomeTransform())
>> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > >
>> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>> > >
>> > > ------- Original Message -------
>> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
>> user@beam.apache.org> wrote:
>> > >
>> > >
>> > > > I believe you have to call withResults() on the JdbcIO transform in
>> order for this to work.
>> > > >
>> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <
>> jcuz...@protonmail.com> wrote:
>> > > >
>> > > > > I hope you all are doing well. I am facing an issue with an
>> Apache Beam pipeline that gets stuck indefinitely when using the Wait.on
>> transform alongside JdbcIO. Here's a simplified version of my code,
>> focusing on the relevant parts:
>> > > > >
>> > > > > PCollection<String> result = p.
>> > > > > apply("Pubsub",
>> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
>> > > > > .apply("Transform", ParDo.of(new MyTransformer()));
>> > > > >
>> > > > > PCollection<Void> insert = result.apply("Inserting",
>> > > > > JdbcIO.<String>writeVoid()
>> > > > > .withDataSourceProviderFn(/*...*/)
>> > > > > .withStatement(/*...*/)
>> > > > > .withPreparedStatementSetter(/*...*/)
>> > > > > );
>> > > > >
>> > > > > result.apply(Wait.on(insert))
>> > > > > .apply("Selecting", new SomeTransform())
>> > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>> > > > > p.run();
>> > > > >
>> > > > > In the code, I'm using the Wait.on transform to make the pipeline
>> wait until the insert transform (which uses JdbcIO to write data) is
>> completed before executing the next steps. However, the pipeline gets stuck
>> and doesn't progress further.
>> > > > >
>> > > > > I've tried adding logging messages in my transforms to track the
>> progress and identify where it's getting stuck, but I haven't been able to
>> pinpoint the issue. I've searched for solutions online, but none of them
>> provided a successful resolution for my problem.
>> > > > >
>> > > > > Can anyone provide any insights or suggestions on how to debug
>> and resolve this issue involving Wait.on and JdbcIO in my Apache Beam
>> pipeline?
>> > > > >
>> > > > > You can find the sample code at: https://github.com/j1cs/app-beam
>> > > > >
>> > > > > Thank you for your help and support.
>> > > > >
>> > > > > Best regards,
>> > > > >
>> > > > > Juan Cuzmar.
>>
>

Reply via email to