The other problem you have here is that you have not set a window. Wait.on
waits for the end of the current window before triggering. The default
Window is the GlobalWindow, so as written Wait.on will wait for the end of
time (or until you drain the pipeline, which will also trigger the
GlobalWindow).

Try adding a 1-minute fixed window to the results you read from PubSub.

On Sat, Apr 22, 2023 at 6:50 AM Juan Cuzmar <[email protected]> wrote:

> writeVoid() and write() plus withResults() return the same
> PCollection<Void> AFAIK. In any case i updated the code and same thing
> happens
>
>  PCollection<String> result = p.
>                 apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(String.format("projects/%s/subscriptions/%s",
> options.getProjectId(), subscription)))
>                 .apply("Transform", ParDo.of(new MyTransformer()));
>
>         PCollection<Void> insert = result.apply("Inserting",
>                 JdbcIO.<String>write()
>
> .withDataSourceProviderFn(DataSourceProvider.of(authDatabaseConfig))
>                         .withStatement("INSERT INTO person (first_name,
> last_name) VALUES (?, 'doe')")
>                         .withPreparedStatementSetter((element,
> preparedStatement) -> {
>                             log.info("Preparing statement to insert");
>                             preparedStatement.setString(1, element);
>                         })
>                         .withResults()
>         );
>         result.apply(Wait.on(insert))
>                 .apply("Selecting", new SomeTransform())
>                 .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
>
> https://github.com/j1cs/app-beam/blob/main/src/main/java/me/jics/AppBeamCommand.java#L63
>
> ------- Original Message -------
> On Saturday, April 22nd, 2023 at 2:08 AM, Reuven Lax via user <
> [email protected]> wrote:
>
>
> > I believe you have to call withResults() on the JdbcIO transform in
> order for this to work.
> >
> > On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar <[email protected]>
> wrote:
> >
> > > I hope you all are doing well. I am facing an issue with an Apache
> Beam pipeline that gets stuck indefinitely when using the Wait.on transform
> alongside JdbcIO. Here's a simplified version of my code, focusing on the
> relevant parts:
> > >
> > > PCollection<String> result = p.
> > > apply("Pubsub",
> PubsubIO.readMessagesWithAttributes().fromSubscription(/*...*/))
> > > .apply("Transform", ParDo.of(new MyTransformer()));
> > >
> > > PCollection<Void> insert = result.apply("Inserting",
> > > JdbcIO.<String>writeVoid()
> > > .withDataSourceProviderFn(/*...*/)
> > > .withStatement(/*...*/)
> > > .withPreparedStatementSetter(/*...*/)
> > > );
> > >
> > > result.apply(Wait.on(insert))
> > > .apply("Selecting", new SomeTransform())
> > > .apply("PubsubMessaging", ParDo.of(new NextTransformer()));
> > > p.run();
> > >
> > > In the code, I'm using the Wait.on transform to make the pipeline wait
> until the insert transform (which uses JdbcIO to write data) is completed
> before executing the next steps. However, the pipeline gets stuck and
> doesn't progress further.
> > >
> > > I've tried adding logging messages in my transforms to track the
> progress and identify where it's getting stuck, but I haven't been able to
> pinpoint the issue. I've searched for solutions online, but none of them
> provided a successful resolution for my problem.
> > >
> > > Can anyone provide any insights or suggestions on how to debug and
> resolve this issue involving Wait.on and JdbcIO in my Apache Beam pipeline?
> > >
> > > You can find the sample code at: https://github.com/j1cs/app-beam
> > >
> > > Thank you for your help and support.
> > >
> > > Best regards,
> > >
> > > Juan Cuzmar.
>

Reply via email to