Ruiii-w commented on PR #10468:
URL: https://github.com/apache/seatunnel/pull/10468#issuecomment-4002426049
## Purpose of this pull request
This pull request adds support for the PostgreSQL `COPY` protocol in the
JDBC Source connector. Compared to standard `SELECT` queries, `COPY (SELECT
...) TO STDOUT` provides significantly better throughput for bulk data
extraction. The feature integrates with SeaTunnel's split (sharding) logic to
preserve parallel reads and supports both CSV and Binary formats.
Note: The feature was developed against SeaTunnel 2.3.8 and has been merged
into `dev`; functionality remains stable.
---
## Improvements Based on PR Review
This implementation has been **enhanced** based on the code review feedback.
Key improvements include:
| Issue | Resolution
|
| ------------------------------ |
------------------------------------------------------------ |
| **Feature Integration** | COPY functionality now integrated into
`JdbcInputFormat` data reading flow |
| **Thread Safety** | Static variables in `PgCopyBinaryReader`
changed to instance variables |
| **SQL Injection Prevention** | Added SQL validation before COPY
execution |
| **Exactly-Once Compatibility** | Added validation to prevent COPY with
Exactly-Once semantics |
| **ChunkSplitter Interface** | Added configuration validation for COPY
with fixed partitioning only |
| **Memory Safety** | Implemented bounded buffer expansion with
upper limits |
| **Exception Handling** | Enhanced resource cleanup with proper
logging |
| **Test Coverage** | Added unit tests and E2E tests for
PostgreSQL COPY |
| **Documentation** | Unified default values between code and
documentation |
---
## Key Features
- **COPY-driven reads**: `COPY (SELECT ...) TO STDOUT` for PostgreSQL
- **Parallel-friendly**: COPY is applied per split generated by SeaTunnel
- **CSV and Binary formats**: Both formats supported with dedicated parsing
(Binary mode uses a fast, robust reader)
- **New JDBC Source options**: PostgreSQL-only options: `copy.enabled`,
`copy.binary`, `copy.buffer_size`
---
## User-Facing Changes
Yes. Optional JDBC Source options are added for PostgreSQL. Defaults remain
unchanged; existing jobs are not affected.
### New Options
| Name | Type | Required | Default | Description
|
| ------------------ | ------- | -------- | --------- |
------------------------------------------------------------ |
| `copy.enabled` | Boolean | No | `false` | Enable `COPY (SELECT
...) TO STDOUT` for PostgreSQL. Only supported for PostgreSQL. Not compatible
with Exactly-Once. |
| `copy.binary` | Boolean | No | `false` | When `copy.enabled =
true`, use `COPY ... WITH BINARY`. SeaTunnel parses PostgreSQL COPY binary
protocol directly. |
| `copy.buffer_size` | Int | No | `1048576` | Buffer size (bytes)
for COPY reading. Allowed range: `65536–10485760`. May be rounded up to a power
of 2 at runtime. |
---
## Documentation Details
1. **Data Source Title**: JDBC Source (PostgreSQL COPY Mode)
2. **Connector Support Version**: SeaTunnel 2.3.8+
3. **Data Source Description**: High-throughput PostgreSQL bulk reads via
`COPY (SELECT ...) TO STDOUT`
4. **Supported Engines**: Spark, Flink, SeaTunnel Zeta
5. **Supported Data Source List**: PostgreSQL
6. **Dependencies**: `org.postgresql:postgresql` (standard JDBC driver)
7. **Data Type Mapping**: Reuses existing JDBC PostgreSQL mapping; in binary
mode, SeaTunnel decodes COPY fields directly
8. **Options**: See the "New Options" table above
---
## Example Configuration
```hocon
source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
username = "postgres"
password = "password"
# Either table_path or query can be used
table_path = "public.my_table"
# query = "select * from public.my_table"
# Enable COPY mode
copy.enabled = true
# Optional: use binary mode
copy.binary = true
# Optional: tune buffer size (64KB ~ 10MB)
copy.buffer_size = 1048576
}
}
```
---
## Compatibility and Constraints
- **PostgreSQL only**: COPY is activated exclusively for the PostgreSQL
dialect; `copy.*` is ignored for other dialects.
- **Not compatible with Exactly-Once**: A validation prevents enabling COPY
with Exactly-Once semantics.
- **SQL Safety**: COPY SQL is generated from your `query`/`table_path` and
split conditions; unsafe patterns (e.g., `;`, comments, DDL/DML) are rejected
by validation.
- **Backward compatibility**: Feature is disabled by default; non-PostgreSQL
use cases remain unchanged.
---
## Key Implementation
- **Dialect-aware path selection in the reader**
**File**: `JdbcInputFormat.java`
```java
if (useCopyStatement) {
pgCopyInput = new PgCopyInput(config, jdbcDialect, chunkSplitter,
splitTableSchema, splitTableId);
pgCopyInput.open(inputSplit);
hasNext = pgCopyInput.hasNext();
} else {
statement = chunkSplitter.generateSplitStatement(inputSplit,
splitTableSchema);
resultSet = statement.executeQuery();
hasNext = resultSet.next();
}
```
- **COPY execution and reader creation**
**File**: `PgCopyInput.java`
```java
String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
if (useBinary) {
selectSql = addCastsForBinaryMode(selectSql, tableSchema);
}
String copySql = String.format("COPY (%s) TO STDOUT WITH %s", selectSql,
useBinary ? "BINARY" : "CSV");
copyStream = copyManagerProxy.copyOutAsStream(copySql);
reader = createReader(copyStream);
```
- **Binary decoding with robust EOF/truncation checks**
**File**: `PgCopyBinaryReader.java`
```java
// Header completeness checks
if (buffer.remaining() < SIGNATURE.length + 8) {
if (eof) throw new JdbcConnectorException(..., "Truncated COPY
header");
return;
}
// Field length payload checks
if (buffer.remaining() < 4) {
if (eof) throw new JdbcConnectorException(..., "Unexpected EOF while
reading field length");
return;
}
```
- **Options and validation**
**Files**: `JdbcSourceOptions.java`, `JdbcSourceConfig.java`
---
## Testing
- **Unit Tests**
**File**: `PgCopyBinaryReaderTest.java` and `PgCopyCsvReaderTest.java`
Coverage highlights:
- `testBinaryHeaderParsing`: Header parsing
- `testRowParsing` / `testNullValues` / `testMoreDataTypes` /
`testDateAndTimestamp` / `testByteAAndSmallIntFloat`: Data types and nulls
- `testInvalidSignature` / `testPrematureEof`: Negative paths
- `testBufferExpansion`: Large field triggers controlled buffer growth
- **E2E Tests**
E2E test is added to `JdbcPostgresIT` by appending
`/jdbc_postgres_source_copy_binary.conf` to `PG_CONFIG_FILE_LIST`, and executed
by `testAutoGenerateSQL` via `container.executeJob(CONFIG_FILE)`.
---
## Backward Compatibility
- All new options are optional and disabled by default.
- Non-PostgreSQL sources ignore COPY options.
- Explicit incompatibility with Exactly-Once prevents ambiguous behavior.
---
## Performance Benchmarks
### Test Environment 1: Jdbc Source → Doris Sink
**Environment**: VMware virtualized x86_64 virtual machine
- **CPU**: Intel Xeon Silver 4210 @ 2.20GHz (16 cores)
- **Memory**: 32GB
- **Storage**: ~128GB virtual disk
- **Architecture**: 2 NUMA nodes, hyperthreading disabled
**Dataset**: 4,513,000 rows, 393 columns
> *All results are from single-threaded performance tests*
#### Native JDBC (SeaTunnel)
- **Total Time**: 3145.9 s (~52 min 26 sec)
- **Average Rate**: 434.6 rec/s
- **Peak Rate**: 1763.0 rec/s
- **Minimum Rate**: 125.7 rec/s
#### COPY WITH BINARY
- **Total Time**: 2011.5 s (~33 min 32 sec)
- **Average Rate**: 2243.6 rec/s
- **Peak Rate**: 2701.2 rec/s
- **Minimum Rate**: 708.3 rec/s
| Metric | COPY WITH BINARY | Native JDBC | Improvement |
| ------------------------ | ---------------- | ----------- | ----------- |
| Total Time (seconds) | 2011.5 | 3145.9 | **-36%** |
| Average Rate (records/s) | 2243.6 | 1434.6 | **+56%** |
| Peak Rate (records/s) | 2701.2 | 1763.0 | **+53%** |
| Throughput (MB/s) | ~14.7 | ~9.35 | **+57%** |
---
### Test Environment 2: Jdbc Source → Console Sink
**Environment**: High-memory multi-core compute server
- **CPU**: Hygon (64 cores, 128 threads)
- **Memory**: 256GB
- **Storage**: 1024GB SSD
**Dataset**: 4,513,000 rows, 393 columns
> *All results are from single-threaded performance tests*
#### Native JDBC (SeaTunnel)
- **Total Time**: 962.9 s (~16 min 2.9 sec)
- **Average Rate**: 4687 rec/s
- **Peak Rate**: 5262.4 rec/s
- **Minimum Rate**: 1963.7 rec/s
#### COPY WITH BINARY
- **Total Time**: 381.5 s (~6 min 21.5 sec)
- **Average Rate**: 11830.1 rec/s
- **Peak Rate**: 12796.3 rec/s
- **Minimum Rate**: 10081.4 rec/s
| Metric | COPY WITH BINARY | Native JDBC | Improvement |
| ------------------------ | ---------------- | ----------- | ----------- |
| Total Time (seconds) | 381.5 | 962.9 | **-60.4%** |
| Average Rate (records/s) | 11830.1 | 4687 | **+152.4%** |
| Peak Rate (records/s) | 12796.3 | 5262.4 | **+143.2%** |
| Throughput (MB/s) | ~77.1 | ~30.5 | **+152.8%** |
---
### Key Findings
- **Binary COPY mode consistently outperforms native JDBC** by **36-60% in
total execution time**
- **Throughput improvements are more significant in high-performance
environments** (152% on high-end servers)
- **Performance gains are substantial for wide tables** (393 columns in test
dataset)
- **Peak throughput exceeds 12,000 records/second** in optimized environments
---
## Checklist
- [x] Documentation updated (en/zh) for new options and behavior
- [x] Unit tests added/updated, including negative and boundary cases
- [x] E2E tests added/updated for PostgreSQL COPY (including Binary)
- [x] No new external dependencies introduced
- [x] Changes scoped to JDBC Source (PostgreSQL) without breaking public SPI
- [x] Code formatted and basic build passes (`spotless`/`verify`/`tests`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]