In the meantime maybe you can look at the implementation we used during our thesis? It is not complete by any stretch but it worked well enough for the join-order benchmark.
https://github.com/Mikkel-MJ/incubator-wayang-thesis/blob/main/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java On 2025/12/12 14:58:20 "Treykorn, Felix" wrote: > Hello everyone, > we are three students from Hasso-Plattner-Institute, Potsdam who are working > on a contribution to the Wayang project. We are getting help from Zoi Kaoudi > when we have questions and she pointed us to the dev list with this one. In > our work we noticed, that the JdbcExecutor does not yet support multiple > sources for an Operator. E.g. the following code throws "Invalid jdbc stage: > multiple sources are not currently supported" > > > > > @Test > void testJoinDbSources() { > WayangContext wayangContext = getTestWayangContext() > .withPlugin(Java.basicPlugin()) > .withPlugin(Spark.basicPlugin()) > .withPlugin(Postgres.plugin()); > > // Two logical sources over the same table.poster_link > TableSource table1 = new PostgresTableSource("imdb_top_1000", > "series_title", "released_year", "certificate", "runtime", "genre", > "imdb_rating", "overview", "meta_score", "director", "star1", "star2", > "star3", "star4", "no_of_votes", "gross"); > TableSource table2 = new PostgresTableSource("imdb_top_1000", > "series_title", "released_year", "certificate", "runtime", "genre", > "imdb_rating", "overview", "meta_score", "director", "star1", "star2", > "star3", "star4", "no_of_votes", "gross"); > > // Join on series_title > JoinOperator<Record, Record, String> joinOperator = > new JoinOperator<>( > record -> record.getString(0), > record -> record.getString(0), > Record.class, > Record.class, > String.class > ); > > joinOperator.getKeyDescriptor0() > .withSqlImplementation("imdb_top_1000", "series_title"); > joinOperator.getKeyDescriptor1() > .withSqlImplementation("imdb_top_1000", "series_title"); > > joinOperator.addTargetPlatform(Postgres.platform()); > > // Wire up both DB sources as inputs to the join. > table1.connectTo(0, joinOperator, 0); > table2.connectTo(0, joinOperator, 1); > > // Collect results. > Collection<Tuple2<Record, Record>> collector = new ArrayList<>(); > LocalCallbackSink<Tuple2<Record, Record>> sink = > LocalCallbackSink.createCollectingSink( > collector, > DataSetType.createDefaultUnchecked(Tuple2.class) > ); > joinOperator.connectTo(0, sink, 0); > // Execute the plan. > wayangContext.execute("PostgreSql join DB-DB", new WayangPlan(sink)); > > // Basic sanity check: we should get at least self-joins. > assertFalse(collector.isEmpty(), "Join result should not be empty."); > } > > > For our contribution we want to be able to use an operator similar to the > Join on the Postgres Platform with multiple sources. Is there already > existing code, that extends the JdbcExecutor() with this functionality? > > Best, > Anton, Max and Felix >
