This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 61ee6f8f45 [flink] Avoid deprecated usages about Configuration (#4584)
61ee6f8f45 is described below
commit 61ee6f8f4526603deae5be44fffb8a0168823565
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Nov 26 14:27:22 2024 +0800
[flink] Avoid deprecated usages about Configuration (#4584)
---
.../org/apache/paimon/benchmark/QueryRunner.java | 2 +-
.../api/common/serialization/SerializerConfig.java | 22 +++++++
.../common/serialization/SerializerConfigImpl.java | 22 +++++++
.../api/common/serialization/SerializerConfig.java | 22 +++++++
.../common/serialization/SerializerConfigImpl.java | 22 +++++++
.../api/common/serialization/SerializerConfig.java | 22 +++++++
.../common/serialization/SerializerConfigImpl.java | 22 +++++++
.../api/common/serialization/SerializerConfig.java | 22 +++++++
.../common/serialization/SerializerConfigImpl.java | 22 +++++++
.../KafkaDebeziumAvroDeserializationSchema.java | 2 +-
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 8 +--
.../flink/action/cdc/mysql/MySqlRecordParser.java | 15 +++--
.../PulsarDebeziumAvroDeserializationSchema.java | 2 +-
.../action/cdc/mongodb/MongodbSchemaITCase.java | 67 ++++++++++------------
.../cdc/mysql/MySqlSyncTableActionITCase.java | 10 +++-
.../flink/sink/cdc/CdcRecordSerializeITCase.java | 28 +++++++--
.../compact/changelog/ChangelogTaskTypeInfo.java | 13 ++++-
.../paimon/flink/sink/CommittableTypeInfo.java | 12 +++-
.../paimon/flink/sink/CompactionTaskTypeInfo.java | 12 +++-
.../org/apache/paimon/flink/sink/FlinkSink.java | 7 +--
.../flink/sink/MultiTableCommittableTypeInfo.java | 12 +++-
.../sink/MultiTableCompactionTaskTypeInfo.java | 13 ++++-
.../paimon/flink/source/FlinkSourceBuilder.java | 16 ++----
.../align/AlignedContinuousFileStoreSource.java | 2 +-
.../paimon/flink/utils/InternalTypeInfo.java | 14 ++++-
.../apache/paimon/flink/utils/JavaTypeInfo.java | 16 +++++-
.../paimon/flink/FileSystemCatalogITCase.java | 3 +-
.../paimon/flink/FlinkJobRecoveryITCase.java | 9 ++-
.../apache/paimon/flink/RescaleBucketITCase.java | 4 +-
.../flink/UnawareBucketAppendOnlyTableITCase.java | 14 ++++-
.../paimon/flink/sink/SinkSavepointITCase.java | 2 +-
.../apache/paimon/flink/util/AbstractTestBase.java | 14 +++--
.../paimon/flink/util/ReadWriteTableTestUtil.java | 26 ++++++---
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 6 +-
34 files changed, 396 insertions(+), 109 deletions(-)
diff --git
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
index b07cdef846..8bfe4b6c9c 100644
---
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
+++
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/QueryRunner.java
@@ -77,7 +77,7 @@ public class QueryRunner {
String sinkPathConfig =
BenchmarkGlobalConfiguration.loadConfiguration()
- .getString(BenchmarkOptions.SINK_PATH);
+ .get(BenchmarkOptions.SINK_PATH);
if (sinkPathConfig == null) {
throw new IllegalArgumentException(
BenchmarkOptions.SINK_PATH.key() + " must be set");
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
new file mode 100644
index 0000000000..16987469a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public interface SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
new file mode 100644
index 0000000000..374d33f650
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+/** Placeholder class to resolve compatibility issues. */
+public class SerializerConfigImpl implements SerializerConfig {}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index fc672b9dc0..eea364d460 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -48,7 +48,7 @@ public class KafkaDebeziumAvroDeserializationSchema
public KafkaDebeziumAvroDeserializationSchema(Configuration
cdcSourceConfig) {
this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
- this.schemaRegistryUrl =
cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
+ this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 64f1275711..df288a4150 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -83,7 +83,7 @@ public interface MongoVersionStrategy {
Configuration mongodbConfig)
throws JsonProcessingException {
SchemaAcquisitionMode mode =
-
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
+
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
ObjectNode objectNode =
JsonSerdeUtil.asSpecificNodeType(jsonNode.asText(),
ObjectNode.class);
JsonNode idNode = objectNode.get(ID_FIELD);
@@ -92,7 +92,7 @@ public interface MongoVersionStrategy {
"The provided MongoDB JSON document does not contain an
_id field.");
}
JsonNode document =
- mongodbConfig.getBoolean(DEFAULT_ID_GENERATION)
+ mongodbConfig.get(DEFAULT_ID_GENERATION)
? objectNode.set(
ID_FIELD,
idNode.get(OID_FIELD) == null ? idNode :
idNode.get(OID_FIELD))
@@ -101,8 +101,8 @@ public interface MongoVersionStrategy {
case SPECIFIED:
return parseFieldsFromJsonRecord(
document.toString(),
- mongodbConfig.getString(PARSER_PATH),
- mongodbConfig.getString(FIELD_NAME),
+ mongodbConfig.get(PARSER_PATH),
+ mongodbConfig.get(FIELD_NAME),
computedColumns,
rowTypeBuilder);
case DYNAMIC:
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 502e6237a4..26579e718f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -45,6 +45,8 @@ import io.debezium.relational.history.TableChanges;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -99,11 +101,14 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
String stringifyServerTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
- this.isDebeziumSchemaCommentsEnabled =
- mySqlConfig.getBoolean(
- DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
- +
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
- false);
+ ConfigOption<Boolean> includeSchemaCommentsConfig =
+ ConfigOptions.key(
+ DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ +
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
+ .name())
+ .booleanType()
+ .defaultValue(false);
+ this.isDebeziumSchemaCommentsEnabled =
mySqlConfig.get(includeSchemaCommentsConfig);
this.serverTimeZone =
stringifyServerTimeZone == null
? ZoneId.systemDefault()
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
index b0d1d1bf62..f45ee034be 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarDebeziumAvroDeserializationSchema.java
@@ -46,7 +46,7 @@ public class PulsarDebeziumAvroDeserializationSchema
public PulsarDebeziumAvroDeserializationSchema(Configuration
cdcSourceConfig) {
this.topic = PulsarActionUtils.findOneTopic(cdcSourceConfig);
- this.schemaRegistryUrl =
cdcSourceConfig.getString(SCHEMA_REGISTRY_URL);
+ this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
index 394cdd1f14..f0328b5663 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -76,13 +76,12 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
@Test
public void testCreateSchemaFromValidConfig() {
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
- mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
- mongodbConfig.setString(
- MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
- mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
- mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
- mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.set(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.set(MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
assertNotNull(schema);
}
@@ -90,13 +89,12 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
@Test
public void testCreateSchemaFromInvalidHost() {
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
- mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
- mongodbConfig.setString(
- MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
- mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
- mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
- mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
+ mongodbConfig.set(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.set(MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
assertThrows(
RuntimeException.class, () ->
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
@@ -106,7 +104,7 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
public void testCreateSchemaFromIncompleteConfig() {
// Create a Configuration object with missing necessary settings
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
// Expect an exception to be thrown due to missing necessary settings
assertThrows(
NullPointerException.class,
@@ -117,13 +115,12 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
public void testCreateSchemaFromDynamicConfig() {
// Create a Configuration object with the necessary settings
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
- mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
- mongodbConfig.setString(
- MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
- mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
- mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
- mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.set(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.set(MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
// Call the method and check the results
Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig);
@@ -142,13 +139,12 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
@Test
public void testCreateSchemaFromInvalidDatabase() {
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
- mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
- mongodbConfig.setString(
- MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
- mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
- mongodbConfig.setString(MongoDBSourceOptions.DATABASE,
"invalidDatabase");
- mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.set(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.set(MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.set(MongoDBSourceOptions.DATABASE, "invalidDatabase");
+ mongodbConfig.set(MongoDBSourceOptions.COLLECTION, "testCollection");
assertThrows(
RuntimeException.class, () ->
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
@@ -157,13 +153,12 @@ public class MongodbSchemaITCase extends
MongoDBActionITCaseBase {
@Test
public void testCreateSchemaFromInvalidCollection() {
Configuration mongodbConfig = new Configuration();
- mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
- mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
- mongodbConfig.setString(
- MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
- mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
- mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
- mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"invalidCollection");
+ mongodbConfig.set(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.set(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.set(MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.set(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.set(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.set(MongoDBSourceOptions.COLLECTION,
"invalidCollection");
assertThrows(
RuntimeException.class, () ->
MongodbSchemaUtils.getMongodbSchema(mongodbConfig));
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index bdeab07a74..febbe4e1de 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -31,7 +31,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeAll;
@@ -1285,8 +1286,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", "default_checkpoint");
mySqlConfig.put("table-name", "t");
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.noRestart());
+ // Using `none` to avoid compatibility issues with Flink 1.18-.
+ Configuration configuration = new Configuration();
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig).build();
action.withStreamExecutionEnvironment(env);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
index 698900436e..b202ca53c9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
@@ -25,6 +25,8 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +37,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -49,7 +53,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class CdcRecordSerializeITCase {
@Test
- public void testCdcRecordKryoSerialize() throws IOException {
+ public void testCdcRecordKryoSerialize() throws Exception {
KryoSerializer<RichCdcMultiplexRecord> kr =
createFlinkKryoSerializer(RichCdcMultiplexRecord.class);
RowType.Builder rowType = RowType.builder();
@@ -78,7 +82,7 @@ public class CdcRecordSerializeITCase {
}
@Test
- public void testUnmodifiableListKryoSerialize() throws IOException {
+ public void testUnmodifiableListKryoSerialize() throws Exception {
KryoSerializer<List> kryoSerializer =
createFlinkKryoSerializer(List.class);
RowType.Builder rowType = RowType.builder();
rowType.field("id", new BigIntType());
@@ -101,8 +105,24 @@ public class CdcRecordSerializeITCase {
assertThat(deserializeRecord).isEqualTo(fields);
}
- public static <T> KryoSerializer<T> createFlinkKryoSerializer(Class<T>
type) {
- return new KryoSerializer<>(type, new ExecutionConfig());
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <T> KryoSerializer<T> createFlinkKryoSerializer(Class<T>
type)
+ throws NoSuchMethodException, InvocationTargetException,
InstantiationException,
+ IllegalAccessException {
+ try {
+ Constructor<KryoSerializer> constructor =
+ KryoSerializer.class.getConstructor(Class.class,
SerializerConfig.class);
+ return (KryoSerializer<T>) constructor.newInstance(type, new
SerializerConfigImpl());
+ } catch (NoSuchMethodException
+ | InvocationTargetException
+ | IllegalAccessException
+ | InstantiationException e) {
+ // to stay compatible with Flink 1.18-
+ }
+
+ Constructor<KryoSerializer> constructor =
+ KryoSerializer.class.getConstructor(Class.class,
ExecutionConfig.class);
+ return (KryoSerializer<T>) constructor.newInstance(type, new
ExecutionConfig());
}
private static final class TestOutputView extends DataOutputStream
implements DataOutputView {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
index 5cae899a07..a529e6764f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogTaskTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.compact.changelog;
import
org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -56,7 +57,17 @@ public class ChangelogTaskTypeInfo extends
TypeInformation<ChangelogCompactTask>
return false;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<ChangelogCompactTask> createSerializer(
+ SerializerConfig serializerConfig) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<ChangelogCompactTask>
createSerializer(ExecutionConfig config) {
// we don't need copy for task
return new
NoneCopyVersionedSerializerTypeSerializerProxy<ChangelogCompactTask>(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
index dcb87238b8..92e826a913 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -57,7 +58,16 @@ public class CommittableTypeInfo extends
TypeInformation<Committable> {
return false;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<Committable> createSerializer(SerializerConfig
config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<Committable> createSerializer(ExecutionConfig
config) {
// no copy, so that data from writer is directly going into committer
while chaining
return new NoneCopyVersionedSerializerTypeSerializerProxy<Committable>(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
index 47defa61a9..6510a85b80 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -58,7 +59,16 @@ public class CompactionTaskTypeInfo extends
TypeInformation<UnawareAppendCompact
return false;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<UnawareAppendCompactionTask>
createSerializer(SerializerConfig config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<UnawareAppendCompactionTask>
createSerializer(ExecutionConfig config) {
// we don't need copy for task
return new
NoneCopyVersionedSerializerTypeSerializerProxy<UnawareAppendCompactionTask>(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 59f2f4b103..dd364c196d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -338,13 +337,11 @@ public abstract class FlinkSink<T> implements
Serializable {
checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"Paimon sink currently does not support unaligned checkpoints.
Please set "
- + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
- + " to false.");
+ + "execution.checkpointing.unaligned.enabled to
false.");
checkArgument(
env.getCheckpointConfig().getCheckpointingMode() ==
CheckpointingMode.EXACTLY_ONCE,
"Paimon sink currently only supports EXACTLY_ONCE checkpoint
mode. Please set "
- +
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
- + " to exactly-once");
+ + "execution.checkpointing.mode to exactly-once");
}
public static void assertBatchAdaptiveParallelism(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
index f82f082098..7da0ae0e20 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -57,7 +58,16 @@ public class MultiTableCommittableTypeInfo extends
TypeInformation<MultiTableCom
return false;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<MultiTableCommittable>
createSerializer(SerializerConfig config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<MultiTableCommittable>
createSerializer(ExecutionConfig config) {
// no copy, so that data from writer is directly going into committer
while chaining
return new
NoneCopyVersionedSerializerTypeSerializerProxy<MultiTableCommittable>(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
index f27f29f87f..0116ff1988 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.table.sink.MultiTableCompactionTaskSerializer;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
@@ -60,7 +61,17 @@ public class MultiTableCompactionTaskTypeInfo
return false;
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<MultiTableUnawareAppendCompactionTask>
createSerializer(
+ SerializerConfig serializerConfig) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<MultiTableUnawareAppendCompactionTask>
createSerializer(
ExecutionConfig executionConfig) {
return new SimpleVersionedSerializerTypeSerializerProxy<
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index b3dcd4840c..e864ec0500 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -46,7 +46,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -331,30 +330,25 @@ public class FlinkSourceBuilder {
checkArgument(
checkpointConfig.isCheckpointingEnabled(),
"The align mode of paimon source is only supported when
checkpoint enabled. Please set "
- +
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key()
- + "larger than 0");
+ + "execution.checkpointing.interval larger than 0");
checkArgument(
checkpointConfig.getMaxConcurrentCheckpoints() == 1,
"The align mode of paimon source supports at most one ongoing
checkpoint at the same time. Please set "
- +
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key()
- + " to 1");
+ + "execution.checkpointing.max-concurrent-checkpoints
to 1");
checkArgument(
checkpointConfig.getCheckpointTimeout()
>
conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT)
.toMillis(),
"The align mode of paimon source requires that the timeout of
checkpoint is greater than the timeout of the source's snapshot alignment.
Please increase "
- +
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()
- + " or decrease "
+ + "execution.checkpointing.timeout or decrease "
+
FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"The align mode of paimon source currently does not support
unaligned checkpoints. Please set "
- + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
- + " to false.");
+ + "execution.checkpointing.unaligned.enabled to
false.");
checkArgument(
env.getCheckpointConfig().getCheckpointingMode() ==
CheckpointingMode.EXACTLY_ONCE,
"The align mode of paimon source currently only supports
EXACTLY_ONCE checkpoint mode. Please set "
- +
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
- + " to exactly-once");
+ + "execution.checkpointing.mode to exactly-once");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index d6b7060763..705e1d9a7a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -73,7 +73,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
limit,
new FutureCompletingBlockingQueue<>(
context.getConfiguration()
-
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
+
.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
index 4ea5db9f34..60898421dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -73,8 +74,17 @@ public class InternalTypeInfo<T> extends TypeInformation<T> {
return false;
}
- @Override
- public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<T> createSerializer(SerializerConfig config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
+ public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig)
{
return serializer.duplicate();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
index a36243c5bd..4aea809b51 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/JavaTypeInfo.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.utils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -78,7 +79,16 @@ public class JavaTypeInfo<T extends Serializable> extends
TypeInformation<T>
return Comparable.class.isAssignableFrom(typeClass);
}
- @Override
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<T> createSerializer(SerializerConfig config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return new JavaSerializer<>(this.typeClass);
}
@@ -91,7 +101,9 @@ public class JavaTypeInfo<T extends Serializable> extends
TypeInformation<T>
@SuppressWarnings("rawtypes")
GenericTypeComparator comparator =
new GenericTypeComparator(
- sortOrderAscending,
createSerializer(executionConfig), this.typeClass);
+ sortOrderAscending,
+ new JavaSerializer<>(this.typeClass),
+ this.typeClass);
return (TypeComparator<T>) comparator;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 239043ff79..915c93680a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -60,7 +59,7 @@ public class FileSystemCatalogITCase extends AbstractTestBase
{
tableEnvironmentBuilder()
.streamingMode()
.parallelism(1)
-
.setConf(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false)
+
.setString("execution.checkpointing.unaligned.enabled", "false")
.build();
path = getTempDirPath();
tEnv.executeSql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
index c46c4c3589..8df379a71b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
@@ -65,7 +65,7 @@ public class FlinkJobRecoveryITCase extends CatalogITCaseBase
{
.set(
CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
- .removeConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL);
+ .removeKey("execution.checkpointing.interval");
// insert source data
batchSql("INSERT INTO source_table1 VALUES (1, 'test-1', '20241030')");
@@ -219,10 +219,9 @@ public class FlinkJobRecoveryITCase extends
CatalogITCaseBase {
batchSql(sql);
}
- Configuration config =
- sEnv.getConfig()
- .getConfiguration()
- .set(StateRecoveryOptions.SAVEPOINT_PATH,
checkpointPath);
+ Configuration config = sEnv.getConfig().getConfiguration();
+ // use config string to stay compatible with flink 1.19-
+ config.setString("execution.state-recovery.path", checkpointPath);
for (Map.Entry<String, String> entry : recoverOptions.entrySet()) {
config.setString(entry.getKey(), entry.getValue());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 08969bddfd..d5747d2e28 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -26,7 +26,6 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -106,9 +105,10 @@ public class RescaleBucketITCase extends CatalogITCaseBase
{
assertThat(batchSql("SELECT * FROM
T3")).containsExactlyInAnyOrderElementsOf(committedData);
// step5: resume streaming job
+ // use config string to stay compatible with flink 1.19-
sEnv.getConfig()
.getConfiguration()
- .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+ .setString("execution.state-recovery.path", savepointPath);
JobClient resumedJobClient =
startJobAndCommitSnapshot(streamSql,
snapshotAfterRescale.id());
// stop job
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index f6dfb1b230..6ca78b088f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TimeUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -50,7 +51,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
-import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -203,7 +203,11 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
- sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ "execution.checkpointing.interval",
+
TimeUtils.formatWithHighestUnit(Duration.ofMillis(500)));
sEnv.executeSql(
"CREATE TEMPORARY TABLE Orders_in (\n"
+ " f0 INT,\n"
@@ -224,7 +228,11 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
batchSql("ALTER TABLE append_table SET
('compaction.early-max.file-num' = '4')");
batchSql("ALTER TABLE append_table SET
('continuous.discovery-interval' = '1 s')");
- sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(500));
+ sEnv.getConfig()
+ .getConfiguration()
+ .setString(
+ "execution.checkpointing.interval",
+
TimeUtils.formatWithHighestUnit(Duration.ofMillis(500)));
sEnv.executeSql(
"CREATE TEMPORARY TABLE Orders_in (\n"
+ " f0 INT,\n"
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
index 6b912d2e57..b1486deacb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/SinkSavepointITCase.java
@@ -137,7 +137,7 @@ public class SinkSavepointITCase extends AbstractTestBase {
.parallelism(1)
.allowRestart()
.setConf(conf)
- .setConf(StateBackendOptions.STATE_BACKEND,
"filesystem")
+ .setConf(StateBackendOptions.STATE_BACKEND, "hashmap")
.setConf(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file://" + path + "/checkpoint")
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index ce0017eb18..ee838ed682 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.util;
import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.TimeUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.dag.Transformation;
@@ -29,7 +30,6 @@ import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@@ -164,6 +164,11 @@ public class AbstractTestBase {
return this;
}
+ public TableEnvironmentBuilder setString(String key, String value) {
+ conf.setString(key, value);
+ return this;
+ }
+
public TableEnvironmentBuilder setConf(Configuration conf) {
this.conf.addAll(conf);
return this;
@@ -182,9 +187,10 @@ public class AbstractTestBase {
if (checkpointIntervalMs != null) {
tEnv.getConfig()
.getConfiguration()
- .set(
-
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
- Duration.ofMillis(checkpointIntervalMs));
+ .setString(
+ "execution.checkpointing.interval",
+ TimeUtils.formatWithHighestUnit(
+
Duration.ofMillis(checkpointIntervalMs)));
}
} else {
tEnv =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 86b0014eb3..9c3170f9a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -23,8 +23,9 @@ import org.apache.paimon.flink.ReadWriteTableITCase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@@ -75,12 +76,11 @@ public class ReadWriteTableTestUtil {
}
public static void init(String warehouse, int parallelism) {
- StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism);
- sExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ // Using `none` to avoid compatibility issues with Flink 1.18-.
+ StreamExecutionEnvironment sExeEnv = buildStreamEnv(parallelism,
"none");
sEnv = StreamTableEnvironment.create(sExeEnv);
- bExeEnv = buildBatchEnv(parallelism);
- bExeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ bExeEnv = buildBatchEnv(parallelism, "none");
bEnv = StreamTableEnvironment.create(bExeEnv,
EnvironmentSettings.inBatchMode());
ReadWriteTableTestUtil.warehouse = warehouse;
@@ -95,16 +95,24 @@ public class ReadWriteTableTestUtil {
bEnv.useCatalog(catalog);
}
- public static StreamExecutionEnvironment buildStreamEnv(int parallelism) {
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ public static StreamExecutionEnvironment buildStreamEnv(
+ int parallelism, String restartStrategy) {
+ Configuration configuration = new Configuration();
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY,
restartStrategy);
+ final StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(100);
env.setParallelism(parallelism);
return env;
}
- public static StreamExecutionEnvironment buildBatchEnv(int parallelism) {
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ public static StreamExecutionEnvironment buildBatchEnv(
+ int parallelism, String restartStrategy) {
+ Configuration configuration = new Configuration();
+ configuration.set(RestartStrategyOptions.RESTART_STRATEGY,
restartStrategy);
+ final StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(parallelism);
return env;
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 74d2d7e1c3..2266a8484d 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -31,12 +31,12 @@ import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.TimeUtils;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
-import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
@@ -139,7 +139,9 @@ public abstract class HiveCatalogITCaseBase {
EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig()
.getConfiguration()
- .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(1));
+ .setString(
+ "execution.checkpointing.interval",
+
TimeUtils.formatWithHighestUnit(Duration.ofSeconds(1)));
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
tEnv.executeSql(