gknz opened a new pull request, #750:
URL: https://github.com/apache/wayang/pull/750
## Hey everyone!
In this PR, we introduce a database-level table sink operator that allows
Wayang pipelines to complete their entire execution inside the database,
without materializing intermediate data in Java or Spark. It builds on the
existing `SqlQueryChannel` infrastructure and fills a gap in the JDBC operator
ecosystem: while `JdbcFilterOperator`, `JdbcJoinOperator`, and
`JdbcProjectionOperator` already allow composing SQL queries inside the
database, there was no sink operator capable of keeping the results there too.
As a result, every in-database pipeline was forced to cross the
`SqlToStreamOperator` boundary and materialize data in Java/Spark memory before
writing it back via JDBC `INSERT`.
The new operator wraps the composed SQL query in `CREATE TABLE x AS SELECT
...` (overwrite mode) or `INSERT INTO x SELECT ...` (append mode), executing
the entire pipeline as a single SQL statement on the database connection. For
pipelines where all operations can be expressed in SQL, this operator can
eliminate unnecessary data movement and serialization overhead.
### What this PR adds
**Abstract operator** (`wayang-jdbc-template`)
- `JdbcTableSinkOperator` — extends the logical `TableSink<Record>` and
implements `JdbcExecutionOperator`. Contributes `CREATE TABLE x AS` (overwrite)
or `INSERT INTO x` (append) to the composed SQL. Accepts `SqlQueryChannel`
input from its platform, declares no output channel (terminal operator).
- `createSqlSuffix()` — an extension point for dialect-specific syntax that
returns an empty string by default. Production databases (PostgreSQL, SQLite,
MySQL) work with the default. Subclasses can override when a specific database
requires additional syntax (this is the case for HSQLDB, which we are using for
the unit tests).
**Concrete platform operators** (thin wrappers, one per existing JDBC
platform)
- `PostgresTableSinkOperator` (`wayang-postgres`) — implements
`PostgresExecutionOperator`
- `Sqlite3TableSinkOperator` (`wayang-sqlite3`) — overrides `getPlatform()`
to return `Sqlite3Platform`
- `GenericJdbcTableSinkOperator` (`wayang-generic-jdbc`) — implements
`GenericJdbcExecutionOperator`
Each follows the exact pattern of the existing filter/join/projection
operators on that platform.
**Mappings** (one per platform)
- `TableSinkMapping` in `wayang-postgres`, `wayang-sqlite3`, and
`wayang-generic-jdbc` — transforms the logical `TableSink` into the
corresponding platform-specific execution operator. Registered in each
platform's `Mappings.java`.
**Executor modification**
(`wayang-jdbc-template/execution/JdbcExecutor.java`)
- New `executeSinkStage()` method — detects when the terminal task of a
stage is a `JdbcTableSinkOperator`, composes the SQL from the stage's operators
using the existing `createSqlString()` method, and executes it directly on the
database connection instead of storing it in a channel for downstream
consumption. Handles overwrite mode by running `DROP TABLE IF EXISTS` as a
separate statement before the main query to avoid multi-statement JDBC issues.
- The existing non-sink path is left unchanged, the new logic only activates
when a sink operator is present.
**Unit tests** (`JdbcTableSinkExecutorTest.java`)
- `testOverwriteModeCreatesNewTable` — end-to-end execution against HSQLDB:
creates a source table with three rows, runs the operator in overwrite mode,
queries the target table back to verify data.
- `testOverwriteModeReplacesExistingTable` — verifies the drop-then-create
flow by starting with a pre-existing target table of a different schema.
- `testAppendModeInsertsIntoExistingTable` — verifies that append mode
preserves existing data and adds new rows.
- `testOverwriteClauseGeneration`, `testAppendClauseGeneration` — unit tests
for `createSqlClause()` output in both modes.
- `HsqldbTableSinkOperator` — test-only sibling that overrides
`createSqlClause`/`createSqlSuffix` for HSQLDB's parenthesized `CREATE TABLE x
AS (SELECT ...) WITH DATA` syntax. This demonstrates the dialect-override
extension point and keeps production SQL generation clean.
### Example usage
```java
PostgresTableSource source = new PostgresTableSource("statustype", "st_id",
"st_name");
FilterOperator<Record> filter = new FilterOperator<>(
new PredicateDescriptor<>(record ->
record.getField(1).toString().equals("Completed"), Record.class)
.withSqlImplementation("st_name = 'Completed'"),
DataSetType.createDefault(Record.class)
);
TableSink<Record> sink = new TableSink<>(null, "overwrite",
"completed_only", "st_id", "st_name");
source.connectTo(0, filter, 0);
filter.connectTo(0, sink, 0);
wayangContext.execute("In-database sink", new WayangPlan(sink));
```
With the Postgres plugin registered, the optimizer selects
`PostgresTableSource`, `PostgresFilterOperator`, and
`PostgresTableSinkOperator`. The composed SQL executed against the database is:
```sql
CREATE TABLE completed_only AS SELECT * FROM statustype WHERE st_name =
'Completed'
```
The entire pipeline is a single SQL statement. No data crosses the JVM
boundary. Verified end-to-end against a PostgreSQL 16 instance.
### Known limitation (join output type)
During the development, we came across a pre-existing architectural
challenge between Wayang's logical type system and SQL semantics.
`JoinOperator` declares its output type as `Tuple2<Record, Record>`, which is
correct for Java/Spark join semantics but mismatched with SQL's flat-row
output. Because type checking occurs at `Operator.connectTo()` during plan
construction — before the optimizer selects execution platforms — any operator
chained after a join must accept `Tuple2`, forcing a manual `MapOperator` to
flatten.
For Java/Spark pipelines, this is inconvenient but works. For the new
in-database sink, the required flatten uses a Java lambda, which forces a
`SqlToStreamOperator` boundary and breaks the in-database chain. This was
previously invisible because every in-database pipeline terminated at
`SqlToStreamOperator` anyway, which converts flat SQL rows into
`Stream<Record>` — effectively discarding the `Tuple2` abstraction at runtime.
This PR does not resolve this. Pipelines of the form `Source → Filter →
Projection → Sink` work fully in-database, but pipelines including joins still
require a Java boundary after the join. A separate discussion about introducing
a no-op `JdbcFlattenOperator` (or an equivalent mechanism) has been raised on
the dev list. This PR intentionally stays scoped to the sink operator.
### Future work
- **Fluent API.** Neither the existing `JavaTableSink`/`SparkTableSink` nor
the new JDBC sink has a `writeTable()` method in `DataQuanta.scala` /
`DataQuantaBuilder.scala`. Adding it would benefit all three implementations
and is a small, self-contained change. Planning a follow-up PR for this once
this one lands.
- **Join → sink in-database.** As noted above, contingent on the broader
discussion about the `Tuple2` vs `Record` type mismatch.
### Testing summary
- Five unit tests covering overwrite, append, and clause generation, all
passing against HSQLDB.
- End-to-end verified against a real PostgreSQL 16 instance with source →
sink and source → filter → sink pipelines.
- Non-sink JDBC stages are unaffected: the existing `JdbcExecutorTest` suite
passes unchanged.
Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]