This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c99eb54ce8f362d970c173a2a579e8fc2cccc8ac Author: fengli <ldliu...@163.com> AuthorDate: Mon May 6 20:19:49 2024 +0800 [FLINK-35195][table] Introduce ContinuousRefreshHandler and serializer for continuous refresh mode --- .../table/refresh/ContinuousRefreshHandler.java | 50 ++++++++++++++++++++++ .../ContinuousRefreshHandlerSerializer.java | 44 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java new file mode 100644 index 00000000000..60a92bed02e --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.refresh; + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +/** Embedded continuous refresh handler of Flink streaming job for materialized table. */ +@Internal +public class ContinuousRefreshHandler implements RefreshHandler, Serializable { + + // TODO: add clusterId for yarn and k8s resource manager + private final String executionTarget; + private final String jobId; + + public ContinuousRefreshHandler(String executionTarget, String jobId) { + this.executionTarget = executionTarget; + this.jobId = jobId; + } + + public String getExecutionTarget() { + return executionTarget; + } + + public String getJobId() { + return jobId; + } + + @Override + public String asSummaryString() { + return String.format("{\n executionTarget: %s,\n jobId: %s\n}", executionTarget, jobId); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandlerSerializer.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandlerSerializer.java new file mode 100644 index 00000000000..f62ccc99e09 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandlerSerializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.refresh; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** Serializer for {@link ContinuousRefreshHandler}. */ +@Internal +public class ContinuousRefreshHandlerSerializer + implements RefreshHandlerSerializer<ContinuousRefreshHandler> { + + public static final ContinuousRefreshHandlerSerializer INSTANCE = + new ContinuousRefreshHandlerSerializer(); + + @Override + public byte[] serialize(ContinuousRefreshHandler refreshHandler) throws IOException { + return InstantiationUtil.serializeObject(refreshHandler); + } + + @Override + public ContinuousRefreshHandler deserialize(byte[] serializedBytes, ClassLoader cl) + throws IOException, ClassNotFoundException { + return InstantiationUtil.deserializeObject(serializedBytes, cl); + } +}