gknz opened a new pull request, #750:
URL: https://github.com/apache/wayang/pull/750

   ## Hey everyone!
   
   In this PR, we introduce a database-level table sink operator that allows 
Wayang pipelines to complete their entire execution inside the database, 
without materializing intermediate data in Java or Spark. It builds on the 
existing `SqlQueryChannel` infrastructure and fills a gap in the JDBC operator 
ecosystem: while `JdbcFilterOperator`, `JdbcJoinOperator`, and 
`JdbcProjectionOperator` already allow composing SQL queries inside the 
database, there was no sink operator capable of keeping the results there too. 
As a result, every in-database pipeline was forced to cross the 
`SqlToStreamOperator` boundary and materialize data in Java/Spark memory before 
writing it back via JDBC `INSERT`.
   
   The new operator wraps the composed SQL query in `CREATE TABLE x AS SELECT 
...` (overwrite mode) or `INSERT INTO x SELECT ...` (append mode), executing 
the entire pipeline as a single SQL statement on the database connection. For 
pipelines where all operations can be expressed in SQL, this operator can 
eliminate unnecessary data movement and serialization overhead.
   
   ### What this PR adds
   
   **Abstract operator** (`wayang-jdbc-template`)
   - `JdbcTableSinkOperator` — extends the logical `TableSink<Record>` and 
implements `JdbcExecutionOperator`. Contributes `CREATE TABLE x AS` (overwrite) 
or `INSERT INTO x` (append) to the composed SQL. Accepts `SqlQueryChannel` 
input from its platform, declares no output channel (terminal operator).
   - `createSqlSuffix()` — an extension point for dialect-specific syntax that 
returns an empty string by default. Production databases (PostgreSQL, SQLite, 
MySQL) work with the default. Subclasses can override when a specific database 
requires additional syntax (this is the case for HSQLDB, which we are using for 
the unit tests).
   
   **Concrete platform operators** (thin wrappers, one per existing JDBC 
platform)
   - `PostgresTableSinkOperator` (`wayang-postgres`) — implements 
`PostgresExecutionOperator`
   - `Sqlite3TableSinkOperator` (`wayang-sqlite3`) — overrides `getPlatform()` 
to return `Sqlite3Platform`
   - `GenericJdbcTableSinkOperator` (`wayang-generic-jdbc`) — implements 
`GenericJdbcExecutionOperator`
   
   Each follows the exact pattern of the existing filter/join/projection 
operators on that platform.
   
   **Mappings** (one per platform)
   - `TableSinkMapping` in `wayang-postgres`, `wayang-sqlite3`, and 
`wayang-generic-jdbc` — transforms the logical `TableSink` into the 
corresponding platform-specific execution operator. Registered in each 
platform's `Mappings.java`.
   
   **Executor modification** 
(`wayang-jdbc-template/execution/JdbcExecutor.java`)
   - New `executeSinkStage()` method — detects when the terminal task of a 
stage is a `JdbcTableSinkOperator`, composes the SQL from the stage's operators 
using the existing `createSqlString()` method, and executes it directly on the 
database connection instead of storing it in a channel for downstream 
consumption. Handles overwrite mode by running `DROP TABLE IF EXISTS` as a 
separate statement before the main query to avoid multi-statement JDBC issues.
   - The existing non-sink path is left unchanged, the new logic only activates 
when a sink operator is present.
   
   **Unit tests** (`JdbcTableSinkExecutorTest.java`)
   - `testOverwriteModeCreatesNewTable` — end-to-end execution against HSQLDB: 
creates a source table with three rows, runs the operator in overwrite mode, 
queries the target table back to verify data.
   - `testOverwriteModeReplacesExistingTable` — verifies the drop-then-create 
flow by starting with a pre-existing target table of a different schema.
   - `testAppendModeInsertsIntoExistingTable` — verifies that append mode 
preserves existing data and adds new rows.
   - `testOverwriteClauseGeneration`, `testAppendClauseGeneration` — unit tests 
for `createSqlClause()` output in both modes.
   - `HsqldbTableSinkOperator` — test-only sibling that overrides 
`createSqlClause`/`createSqlSuffix` for HSQLDB's parenthesized `CREATE TABLE x 
AS (SELECT ...) WITH DATA` syntax. This demonstrates the dialect-override 
extension point and keeps production SQL generation clean.
   
   ### Example usage
   
   ```java
   PostgresTableSource source = new PostgresTableSource("statustype", "st_id", 
"st_name");
   
   FilterOperator<Record> filter = new FilterOperator<>(
       new PredicateDescriptor<>(record -> 
record.getField(1).toString().equals("Completed"), Record.class)
           .withSqlImplementation("st_name = 'Completed'"),
       DataSetType.createDefault(Record.class)
   );
   
   TableSink<Record> sink = new TableSink<>(null, "overwrite", 
"completed_only", "st_id", "st_name");
   
   source.connectTo(0, filter, 0);
   filter.connectTo(0, sink, 0);
   
   wayangContext.execute("In-database sink", new WayangPlan(sink));
   ```
   
   With the Postgres plugin registered, the optimizer selects 
`PostgresTableSource`, `PostgresFilterOperator`, and 
`PostgresTableSinkOperator`. The composed SQL executed against the database is:
   
   ```sql
   CREATE TABLE completed_only AS SELECT * FROM statustype WHERE st_name = 
'Completed'
   ```
   
   The entire pipeline is a single SQL statement. No data crosses the JVM 
boundary. Verified end-to-end against a PostgreSQL 16 instance.
   
   ### Known limitation (join output type)
   
   During the development, we came across a pre-existing architectural 
challenge between Wayang's logical type system and SQL semantics. 
`JoinOperator` declares its output type as `Tuple2<Record, Record>`, which is 
correct for Java/Spark join semantics but mismatched with SQL's flat-row 
output. Because type checking occurs at `Operator.connectTo()` during plan 
construction — before the optimizer selects execution platforms — any operator 
chained after a join must accept `Tuple2`, forcing a manual `MapOperator` to 
flatten.
   
   For Java/Spark pipelines, this is inconvenient but works. For the new 
in-database sink, the required flatten uses a Java lambda, which forces a 
`SqlToStreamOperator` boundary and breaks the in-database chain. This was 
previously invisible because every in-database pipeline terminated at 
`SqlToStreamOperator` anyway, which converts flat SQL rows into 
`Stream<Record>` — effectively discarding the `Tuple2` abstraction at runtime.
   
   This PR does not resolve this. Pipelines of the form `Source → Filter → 
Projection → Sink` work fully in-database, but pipelines including joins still 
require a Java boundary after the join. A separate discussion about introducing 
a no-op `JdbcFlattenOperator` (or an equivalent mechanism) has been raised on 
the dev list. This PR intentionally stays scoped to the sink operator.
   
   ### Future work
   
   - **Fluent API.** Neither the existing `JavaTableSink`/`SparkTableSink` nor 
the new JDBC sink has a `writeTable()` method in `DataQuanta.scala` / 
`DataQuantaBuilder.scala`. Adding it would benefit all three implementations 
and is a small, self-contained change. Planning a follow-up PR for this once 
this one lands.
   - **Join → sink in-database.** As noted above, contingent on the broader 
discussion about the `Tuple2` vs `Record` type mismatch.
   
   ### Testing summary
   
   - Five unit tests covering overwrite, append, and clause generation, all 
passing against HSQLDB.
   - End-to-end verified against a real PostgreSQL 16 instance with source → 
sink and source → filter → sink pipelines.
   - Non-sink JDBC stages are unaffected: the existing `JdbcExecutorTest` suite 
passes unchanged.
   
   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to