Hi,

Had to digg into a bit, took a few moment :)

Right now, there isn't a way to use Wayang's JDBC stack to run a stage with 
multiple logical sources. JdbcExecutor 
(wayang-platforms/wayang-jdbc-template/.../JdbcExecutor.java [lines 157-228]) 
and its generic variant enforce exactly one start task and limit the stage to a 
single table source plus optional filters, projection, and joins. 

The Postgres platform reuses this executor without overriding that restriction, 
so a join between two JDBC sources fails with the "multiple sources are not 
currently supported" assertion you observed. Supporting multi-source JDBC 
operators would require extending the executor (and probably the stage-building 
logic) to accept more than one start task and to compose the resulting SQL 
accordingly. 

Would be great when you’d extend the JDBC connector to support multiple 
sources! 

Best,
 —Alex


---
NovaTechFlow
https://www.novatechflow.com/

> On Dec 12, 2025, at 15:58, Treykorn, Felix 
> <[email protected]> wrote:
> 
> Hello everyone,
> we are three students from Hasso-Plattner-Institute, Potsdam who are working 
> on a contribution to the Wayang project. We are getting help from Zoi Kaoudi 
> when we have questions and she pointed us to the dev list with this one. In 
> our work we noticed, that the JdbcExecutor does not yet support multiple 
> sources for an Operator. E.g. the following code throws "Invalid jdbc stage: 
> multiple sources are not currently supported"
> 
> 
> 
> 
> @Test
> void testJoinDbSources() {
>   WayangContext wayangContext = getTestWayangContext()
>           .withPlugin(Java.basicPlugin())
>           .withPlugin(Spark.basicPlugin())
>           .withPlugin(Postgres.plugin());
> 
>   // Two logical sources over the same table.poster_link
>   TableSource table1 = new PostgresTableSource("imdb_top_1000",  
> "series_title", "released_year", "certificate", "runtime", "genre", 
> "imdb_rating", "overview", "meta_score", "director", "star1", "star2", 
> "star3", "star4", "no_of_votes", "gross");
>   TableSource table2 = new PostgresTableSource("imdb_top_1000",  
> "series_title", "released_year", "certificate", "runtime", "genre", 
> "imdb_rating", "overview", "meta_score", "director", "star1", "star2", 
> "star3", "star4", "no_of_votes", "gross");
> 
>   // Join on series_title
>   JoinOperator<Record, Record, String> joinOperator =
>           new JoinOperator<>(
>                   record -> record.getString(0),
>                   record -> record.getString(0),
>                   Record.class,
>                   Record.class,
>                   String.class
>           );
> 
>   joinOperator.getKeyDescriptor0()
>           .withSqlImplementation("imdb_top_1000", "series_title");
>   joinOperator.getKeyDescriptor1()
>           .withSqlImplementation("imdb_top_1000", "series_title");
> 
>   joinOperator.addTargetPlatform(Postgres.platform());
> 
>   // Wire up both DB sources as inputs to the join.
>   table1.connectTo(0, joinOperator, 0);
>   table2.connectTo(0, joinOperator, 1);
> 
>   // Collect results.
>   Collection<Tuple2<Record, Record>> collector = new ArrayList<>();
>   LocalCallbackSink<Tuple2<Record, Record>> sink =
>           LocalCallbackSink.createCollectingSink(
>                   collector,
>                   DataSetType.createDefaultUnchecked(Tuple2.class)
>           );
>   joinOperator.connectTo(0, sink, 0);
>   // Execute the plan.
>   wayangContext.execute("PostgreSql join DB-DB", new WayangPlan(sink));
> 
>   // Basic sanity check: we should get at least self-joins.
>   assertFalse(collector.isEmpty(), "Join result should not be empty.");
> }
> 
> 
> For our contribution we want to be able to use an operator similar to the 
> Join on the Postgres Platform with multiple sources. Is there already 
> existing code, that extends the JdbcExecutor() with this functionality?
> 
> Best,
> Anton, Max and Felix

Reply via email to