Oh - in that case it's possible that the problem may be the direct runner's implementation of the pubsub source - especially the watermark. For a direct-runner test, I recommend using TestStream (which allows you to advance the watermark manually, so you can test windowing).
On Sat, Apr 22, 2023 at 10:28 AM Juan Cuzmar <jcuz...@protonmail.com> wrote: > I'm developing with direct runner. but should go to dataflow when > deployed. > > > -------- Original Message -------- > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote: > > > What runner are you using to run this pipeline? > > On Sat, Apr 22, 2023 at 9:47 AM Juan Cuzmar <jcuz...@protonmail.com> > wrote: > >> Same result: >> PCollection<String> result = p >> .apply("Pubsub", >> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", >> options.getProjectId(), subscription))) >> .apply("Transform", ParDo.of(new MyTransformer())) >> .apply("Windowing", >> Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) >> .triggering(AfterWatermark.pastEndOfWindow() >> >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) >> .withAllowedLateness(Duration.standardMinutes(1)) >> .discardingFiredPanes()); >> >> PCollection<Void> insert = result.apply("Inserting", >> JdbcIO.<String>write() >> >> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) >> .withStatement("INSERT INTO person (first_name, >> last_name) VALUES (?, 'doe')") >> .withPreparedStatementSetter((element, >> preparedStatement) -> { >> log.info("Preparing statement to insert"); >> preparedStatement.setString(1, element); >> }) >> .withResults() >> ); >> result.apply(Wait.on(insert)) >> .apply("Selecting", new SomeTransform()) >> .apply("PubsubMessaging", ParDo.of(new >> NextTransformer())); >> p.run(); >> >> updated the github repo as wqell. >> >> ------- Original Message ------- >> On Saturday, April 22nd, 2023 at 11:18 AM, Reuven Lax via user < >> user@beam.apache.org> wrote: >> >> >> > The other problem you have here is that you have not set a window. >> Wait.on waits for the end of the current window before triggering. The >> default Window is the GlobalWindow, so as written Wait.on will wait for the >> end of time (or until you drain the pipeline, which will also trigger the >> GlobalWindow). >> > Try adding a 1-minute fixed window to the results you read from PubSub. >> > >> > On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <jcuz...@protonmail.com> >> wrote: >> > >> > > writeVoid() and write() plus withResults() return the same >> PCollection<Void> AFAIK. In any case i updated the code and same thing >> happens >> > > >> > > PCollection<String> result = p. >> > > apply("Pubsub", >> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s", >> options.getProjectId(), subscription))) >> > > .apply("Transform", ParDo.of(new MyTransformer())); >> > > >> > > PCollection<Void> insert = result.apply("Inserting", >> > > JdbcIO.<String>write() >> > > .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig)) >> > > .withStatement("INSERT INTO person (first_name, last_name) VALUES (?, >> 'doe')") >> > > .withPreparedStatementSetter((element, preparedStatement) -> { >> > > log.info("Preparing statement to insert"); >> > > preparedStatement.setString(1, element); >> > > }) >> > > .withResults() >> > > ); >> > > result.apply(Wait.on(insert)) >> > > .apply("Selecting", new SomeTransform()) >> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); >> > > >> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63 >> > > >> > > ------- Original Message ------- >> > > On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user < >> user@beam.apache.org> wrote: >> > > >> > > >> > > > I believe you have to call withResults() on the JdbcIO transform in >> order for this to work. >> > > > >> > > > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar < >> jcuz...@protonmail.com> wrote: >> > > > >> > > > > I hope you all are doing well. I am facing an issue with an >> Apache Beam pipeline that gets stuck indefinitely when using the Wait.on >> transform alongside JdbcIO. Here's a simplified version of my code, >> focusing on the relevant parts: >> > > > > >> > > > > PCollection<String> result = p. >> > > > > apply("Pubsub", >> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/)) >> > > > > .apply("Transform", ParDo.of(new MyTransformer())); >> > > > > >> > > > > PCollection<Void> insert = result.apply("Inserting", >> > > > > JdbcIO.<String>writeVoid() >> > > > > .withDataSourceProviderFn(/*...*/) >> > > > > .withStatement(/*...*/) >> > > > > .withPreparedStatementSetter(/*...*/) >> > > > > ); >> > > > > >> > > > > result.apply(Wait.on(insert)) >> > > > > .apply("Selecting", new SomeTransform()) >> > > > > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); >> > > > > p.run(); >> > > > > >> > > > > In the code, I'm using the Wait.on transform to make the pipeline >> wait until the insert transform (which uses JdbcIO to write data) is >> completed before executing the next steps. However, the pipeline gets stuck >> and doesn't progress further. >> > > > > >> > > > > I've tried adding logging messages in my transforms to track the >> progress and identify where it's getting stuck, but I haven't been able to >> pinpoint the issue. I've searched for solutions online, but none of them >> provided a successful resolution for my problem. >> > > > > >> > > > > Can anyone provide any insights or suggestions on how to debug >> and resolve this issue involving Wait.on and JdbcIO in my Apache Beam >> pipeline? >> > > > > >> > > > > You can find the sample code at: https://github.com/j1cs/app-beam >> > > > > >> > > > > Thank you for your help and support. >> > > > > >> > > > > Best regards, >> > > > > >> > > > > Juan Cuzmar. >> >