Flink CDC Issue Import created FLINK-34859:
----------------------------------------------
Summary: [Bug] Oracle cdc in table api does no support
server-time-zone option
Key: FLINK-34859
URL: https://issues.apache.org/jira/browse/FLINK-34859
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found
nothing similar.
### Flink version
1.17.1
### Flink CDC version
3.0.0
### Database and its version
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options
### Minimal reproduce step
## Create a cdc source in table api with `server-time-zone` option specified.
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
Schema schema = Schema.newBuilder()
.column("NAME", DataTypes.STRING())
.column("ADDR", DataTypes.STRING())
.build();
String factoryIdentifier = new
OracleTableSourceFactory().factoryIdentifier();
TableDescriptor tableDescriptor =
TableDescriptor.forConnector(factoryIdentifier)
.schema(schema)
// .format(DebeziumJsonFormatFactory.IDENTIFIER)
.option(OracleSourceOptions.HOSTNAME, "my-oracle-host")
.option(OracleSourceOptions.PORT, 1521)
.option(OracleSourceOptions.USERNAME, "my-oracle-username")
.option(OracleSourceOptions.PASSWORD, "my-oracle-password")
.option(OracleSourceOptions.DATABASE_NAME, "my-oracle-database")
.option(OracleSourceOptions.SCHEMA_NAME, "my-oracle-schema")
.option(OracleSourceOptions.TABLE_NAME, "TEST")
.option(OracleSourceOptions.SCAN_STARTUP_MODE, "initial")
.option(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
false)
.option(OracleSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, 10)
.option(OracleSourceOptions.SERVER_TIME_ZONE, "Asia/Shanghai")
.option("debezium.include.schema.changes", "false")
.option("debezium.database.history.store.only.captured.tables.ddl", "true")
.build();
StreamTableEnvironmentImpl tEnv = (StreamTableEnvironmentImpl)
StreamTableEnvironmentImpl.create(env,
EnvironmentSettings.newInstance().inStreamingMode().build());
Table table = tEnv.from(tableDescriptor);
tEnv.toChangelogStream(table).print();
env.execute();
```
## Exceptions are:
```text
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Unable to create a source for reading table '*anonymous_oracle-cdc$1*'.
Table options are:
'connector'='oracle-cdc'
'database-name'='my-oracle-database'
'debezium.database.history.store.only.captured.tables.ddl'='true'
'debezium.include.schema.changes'='false'
'hostname'='my-oracle-host'
'password'='******'
'port'='1521'
'scan.incremental.snapshot.enabled'='false'
'scan.snapshot.fetch.size'='10'
'scan.startup.mode'='initial'
'schema-name'='my-oracle-schema'
'server-time-zone'='Asia/Shanghai'
'table-name'='TEST'
'username'='my-oracle-username'
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:357)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
at
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
at
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at
org.codebase.flink.cdc.FlinkOracleCdcTest.main(FlinkOracleCdcTest.java:70)
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options
found for 'oracle-cdc'.
Unsupported options:
server-time-zone
Supported options:
chunk-key.even-distribution.factor.lower-bound
chunk-key.even-distribution.factor.upper-bound
chunk-meta.group.size
connect.max-retries
connect.timeout
connection.pool.size
connector
database-name
debezium.database.history.store.only.captured.tables.ddl
debezium.include.schema.changes
hostname
password
port
property-version
scan.incremental.close-idle-reader.enabled
scan.incremental.snapshot.backfill.skip
scan.incremental.snapshot.chunk.key-column
scan.incremental.snapshot.chunk.size
scan.incremental.snapshot.enabled
scan.snapshot.fetch.size
scan.startup.mode
schema-name
split-key.even-distribution.factor.lower-bound
split-key.even-distribution.factor.upper-bound
table-name
url
username
at
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:632)
at
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:931)
at
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:955)
at
com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory.createDynamicTableSource(OracleTableSourceFactory.java:70)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164]
... 28 more
```
### What did you expect to see?
The program should run and print data
### What did you see instead?
Exception
### Anything else?
![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/b3bd94c8-6388-4c4d-b614-0084dd262a5c]
![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/df2dc0b9-c45e-450a-ab41-c6998e7d49ef]
As we can see in the screenshots, `OracleTableSourceFactory` does not support
`server-time-zone` option as well as `MySqlTableSourceFactory` support this
option. By the way, we can specify this option in the `OracleSourceBuilder`
class.
### Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2977
Created by: [LiuBodong|https://github.com/LiuBodong]
Labels: bug,
Created at: Tue Jan 09 09:24:10 CST 2024
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)