This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 29aee6d [hotfix] Fix the config casting in TableStoreFactory.enrichOptions 29aee6d is described below commit 29aee6d14fce88ef5ba16ca52da48a83bad84f07 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Mar 17 11:35:57 2022 +0800 [hotfix] Fix the config casting in TableStoreFactory.enrichOptions This closes #49 --- .../table/store/connector/TableStoreFactory.java | 3 +- .../store/connector/utils/TableConfigUtils.java | 55 ++++++++++++++++++++++ .../table/store/connector/TableStoreTestBase.java | 6 ++- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java index 731f5c3..198ba1b 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java @@ -37,6 +37,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.ManagedTableFactory; import org.apache.flink.table.store.connector.sink.TableStoreSink; import org.apache.flink.table.store.connector.source.TableStoreSource; +import org.apache.flink.table.store.connector.utils.TableConfigUtils; import org.apache.flink.table.store.file.FileStoreOptions; import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; import org.apache.flink.table.store.log.LogOptions; @@ -68,7 +69,7 @@ public class TableStoreFactory @Override public Map<String, String> enrichOptions(Context context) { Map<String, String> enrichedOptions = new HashMap<>(context.getCatalogTable().getOptions()); - ((Configuration) context.getConfiguration()) + TableConfigUtils.extractConfiguration(context.getConfiguration()) .toMap() .forEach( (k, v) -> { diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/TableConfigUtils.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/TableConfigUtils.java new file mode 100644 index 0000000..eff9d43 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/TableConfigUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableConfig; + +import java.lang.reflect.Field; + +/** Utils for {@link TableConfig}. */ +public class TableConfigUtils { + + public static Configuration extractConfiguration(ReadableConfig readableConfig) { + return extractConfiguration(readableConfig, new Configuration()); + } + + private static Configuration extractConfiguration(ReadableConfig from, Configuration to) { + if (from instanceof Configuration) { + return (Configuration) from; + } + + if (!(from instanceof TableConfig)) { + throw new RuntimeException("Unknown readableConfig type: " + from.getClass()); + } + + TableConfig tableConfig = (TableConfig) from; + try { + Field rootField = TableConfig.class.getDeclaredField("rootConfiguration"); + rootField.setAccessible(true); + ReadableConfig rootConfig = (ReadableConfig) rootField.get(tableConfig); + extractConfiguration(rootConfig, to); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + to.addAll(tableConfig.getConfiguration()); + return to; + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java index 4abc7f9..a1795ff 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentImpl; @@ -85,11 +86,12 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase { @Before public void setup() { super.setup(); - env.setRuntimeMode(executionMode); + EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inBatchMode(); if (executionMode == RuntimeExecutionMode.STREAMING) { env.enableCheckpointing(100); + builder.inStreamingMode(); } - tEnv = StreamTableEnvironment.create(env); + tEnv = StreamTableEnvironment.create(env, builder.build()); ((TableEnvironmentImpl) tEnv) .getCatalogManager() .registerCatalog(