Hello everyone,
we are three students from Hasso-Plattner-Institute, Potsdam who are working on
a contribution to the Wayang project. We are getting help from Zoi Kaoudi when
we have questions and she pointed us to the dev list with this one. In our work
we noticed, that the JdbcExecutor does not yet support multiple sources for an
Operator. E.g. the following code throws "Invalid jdbc stage: multiple sources
are not currently supported"
@Test
void testJoinDbSources() {
WayangContext wayangContext = getTestWayangContext()
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin())
.withPlugin(Postgres.plugin());
// Two logical sources over the same table.poster_link
TableSource table1 = new PostgresTableSource("imdb_top_1000",
"series_title", "released_year", "certificate", "runtime", "genre",
"imdb_rating", "overview", "meta_score", "director", "star1", "star2", "star3",
"star4", "no_of_votes", "gross");
TableSource table2 = new PostgresTableSource("imdb_top_1000",
"series_title", "released_year", "certificate", "runtime", "genre",
"imdb_rating", "overview", "meta_score", "director", "star1", "star2", "star3",
"star4", "no_of_votes", "gross");
// Join on series_title
JoinOperator<Record, Record, String> joinOperator =
new JoinOperator<>(
record -> record.getString(0),
record -> record.getString(0),
Record.class,
Record.class,
String.class
);
joinOperator.getKeyDescriptor0()
.withSqlImplementation("imdb_top_1000", "series_title");
joinOperator.getKeyDescriptor1()
.withSqlImplementation("imdb_top_1000", "series_title");
joinOperator.addTargetPlatform(Postgres.platform());
// Wire up both DB sources as inputs to the join.
table1.connectTo(0, joinOperator, 0);
table2.connectTo(0, joinOperator, 1);
// Collect results.
Collection<Tuple2<Record, Record>> collector = new ArrayList<>();
LocalCallbackSink<Tuple2<Record, Record>> sink =
LocalCallbackSink.createCollectingSink(
collector,
DataSetType.createDefaultUnchecked(Tuple2.class)
);
joinOperator.connectTo(0, sink, 0);
// Execute the plan.
wayangContext.execute("PostgreSql join DB-DB", new WayangPlan(sink));
// Basic sanity check: we should get at least self-joins.
assertFalse(collector.isEmpty(), "Join result should not be empty.");
}
For our contribution we want to be able to use an operator similar to the Join
on the Postgres Platform with multiple sources. Is there already existing code,
that extends the JdbcExecutor() with this functionality?
Best,
Anton, Max and Felix