This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8576178c4c084c38897e395479df11f15a4ea402 Author: fengli <ldliu...@163.com> AuthorDate: Tue Apr 23 10:02:16 2024 +0800 [FLINK-35188][table-api] Introduce RefreshHandler interface to support materialized table --- .../apache/flink/table/refresh/RefreshHandler.java | 47 ++++++++++++++++++++++ .../table/refresh/RefreshHandlerSerializer.java | 35 ++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandler.java new file mode 100644 index 00000000000..16a9b9ab1ae --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.refresh; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Catalog; + +/** + * This interface represents the meta information of current materialized table background refresh + * pipeline. The refresh mode maybe continuous or full, the meta information in the two modes is not + * consistent, so user need to implementation this interface according to different case. + * + * <p>In continuous mode, the meta information maybe contains { "clusterType": "yarn", "clusterId": + * "xxx", "jobId": "yyyy" }. + * + * <p>In full mode, the meta information maybe contains { "endpoint": "xxx", "workflowId": "yyy" }. + * Due to user may use different workflow scheduler in this mode, user should implement this + * interface according to their plugin. + * + * <p>This interface will be serialized to bytes by {@link RefreshHandlerSerializer}, then store to + * {@link Catalog} for further operation. + */ +@PublicEvolving +public interface RefreshHandler { + + /** + * Returns a string that summarizes this refresh handler meta information for printing to a + * console or log. + */ + String asSummaryString(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandlerSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandlerSerializer.java new file mode 100644 index 00000000000..c8693e71089 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/RefreshHandlerSerializer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.refresh; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; + +/** This interface is used to serialize and deserialize the {@link RefreshHandler}. */ +@PublicEvolving +public interface RefreshHandlerSerializer<T extends RefreshHandler> { + + /** Serialize the {@link RefreshHandler} instance to bytes. */ + byte[] serialize(T refreshHandler) throws IOException; + + /** Deserialize the bytes to a {@link RefreshHandler} instance. */ + T deserialize(byte[] serializedBytes, ClassLoader cl) + throws IOException, ClassNotFoundException; +}