This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1378979f02e [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table 1378979f02e is described below commit 1378979f02eed55bbf3f91b08ec166d55b2c42a6 Author: Ron <ron9....@gmail.com> AuthorDate: Thu May 16 19:41:54 2024 +0800 [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table [FLINK-35346][table-common] Introduce workflow scheduler interface for materialized table This closes #24767 --- .../apache/flink/table/factories/FactoryUtil.java | 9 +- .../table/factories/WorkflowSchedulerFactory.java | 56 +++++++ .../factories/WorkflowSchedulerFactoryUtil.java | 156 ++++++++++++++++++ .../table/workflow/CreateRefreshWorkflow.java | 29 ++++ .../table/workflow/DeleteRefreshWorkflow.java | 48 ++++++ .../table/workflow/ModifyRefreshWorkflow.java | 40 +++++ .../flink/table/workflow/RefreshWorkflow.java | 34 ++++ .../flink/table/workflow/WorkflowException.java | 37 +++++ .../flink/table/workflow/WorkflowScheduler.java | 91 +++++++++++ .../workflow/TestWorkflowSchedulerFactory.java | 175 +++++++++++++++++++++ .../workflow/WorkflowSchedulerFactoryUtilTest.java | 107 +++++++++++++ .../org.apache.flink.table.factories.Factory | 1 + 12 files changed, 782 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index d8d6d7e9000..5d66b23c3d8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -167,6 +167,13 @@ public final class FactoryUtil { + "tasks to advance their watermarks without the need to wait for " + "watermarks from this source while it is idle."); + public static final ConfigOption<String> WORKFLOW_SCHEDULER_TYPE = + ConfigOptions.key("workflow-scheduler.type") + .stringType() + .noDefaultValue() + .withDescription( + "Specify the workflow scheduler type that is used for materialized table."); + /** * Suffix for keys of {@link ConfigOption} in case a connector requires multiple formats (e.g. * for both key and value). @@ -903,7 +910,7 @@ public final class FactoryUtil { return loadResults; } - private static String stringifyOption(String key, String value) { + public static String stringifyOption(String key, String value) { if (GlobalConfiguration.isSensitive(key)) { value = HIDDEN_CONTENT; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java new file mode 100644 index 00000000000..72e144f7d19 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.workflow.WorkflowScheduler; + +import java.util.Map; + +/** + * A factory to create a {@link WorkflowScheduler} instance. + * + * <p>See {@link Factory} for more information about the general design of a factory. + */ +@PublicEvolving +public interface WorkflowSchedulerFactory extends Factory { + + /** Create a workflow scheduler instance which interacts with external scheduler service. */ + WorkflowScheduler<?> createWorkflowScheduler(Context context); + + /** Context provided when a workflow scheduler is created. */ + @PublicEvolving + interface Context { + + /** Gives the config option to create {@link WorkflowScheduler}. */ + ReadableConfig getConfiguration(); + + /** + * Returns the options with which the workflow scheduler is created. All options that are + * prefixed with the workflow scheduler identifier are included in the map. + * + * <p>All the keys in the options are pruned with the prefix. For example, the option {@code + * workflow-scheduler.airflow.endpoint}'s key is {@code endpoint} in the map. + * + * <p>An implementation should perform validation of these options. + */ + Map<String, String> getWorkflowSchedulerOptions(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java new file mode 100644 index 00000000000..593d6b47d6a --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/WorkflowSchedulerFactoryUtil.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.workflow.WorkflowScheduler; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; +import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE; +import static org.apache.flink.table.factories.FactoryUtil.stringifyOption; + +/** Utility for working with {@link WorkflowScheduler}. */ +@PublicEvolving +public class WorkflowSchedulerFactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(WorkflowSchedulerFactoryUtil.class); + + public static final String WORKFLOW_SCHEDULER_PREFIX = "workflow-scheduler"; + + private WorkflowSchedulerFactoryUtil() { + // no instantiation + } + + /** + * Attempts to discover the appropriate workflow scheduler factory and creates the instance of + * the scheduler. Return null directly if doesn't specify the workflow scheduler in config + * because it is optional for materialized table. + */ + public static @Nullable WorkflowScheduler<?> createWorkflowScheduler( + Configuration configuration, ClassLoader classLoader) { + // Workflow scheduler identifier + String identifier = configuration.get(WORKFLOW_SCHEDULER_TYPE); + if (StringUtils.isNullOrWhitespaceOnly(identifier)) { + LOG.warn( + "Workflow scheduler options do not contain an option key '%s' for discovering an workflow scheduler."); + return null; + } + + try { + final WorkflowSchedulerFactory factory = + FactoryUtil.discoverFactory( + classLoader, WorkflowSchedulerFactory.class, identifier); + return factory.createWorkflowScheduler( + new DefaultWorkflowSchedulerContext( + configuration, getWorkflowSchedulerConfig(configuration, identifier))); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Error creating workflow scheduler '%s' in option space '%s'.", + identifier, + configuration.toMap().entrySet().stream() + .map( + optionEntry -> + stringifyOption( + optionEntry.getKey(), + optionEntry.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + + private static Map<String, String> getWorkflowSchedulerConfig( + Configuration flinkConf, String identifier) { + return new DelegatingConfiguration(flinkConf, getWorkflowSchedulerOptionPrefix(identifier)) + .toMap(); + } + + private static String getWorkflowSchedulerOptionPrefix(String identifier) { + return String.format("%s.%s.", WORKFLOW_SCHEDULER_PREFIX, identifier); + } + + /** + * Creates a utility that helps to validate options for a {@link WorkflowSchedulerFactory}. + * + * <p>Note: This utility checks for left-over options in the final step. + */ + public static WorkflowSchedulerFactoryHelper createWorkflowSchedulerFactoryHelper( + WorkflowSchedulerFactory workflowSchedulerFactory, + WorkflowSchedulerFactory.Context context) { + return new WorkflowSchedulerFactoryHelper( + workflowSchedulerFactory, context.getWorkflowSchedulerOptions()); + } + + /** + * Helper utility for validating all options for a {@link WorkflowSchedulerFactory}. + * + * @see #createWorkflowSchedulerFactoryHelper(WorkflowSchedulerFactory, + * WorkflowSchedulerFactory.Context) + */ + @PublicEvolving + public static class WorkflowSchedulerFactoryHelper + extends FactoryUtil.FactoryHelper<WorkflowSchedulerFactory> { + + public WorkflowSchedulerFactoryHelper( + WorkflowSchedulerFactory workflowSchedulerFactory, + Map<String, String> configOptions) { + super(workflowSchedulerFactory, configOptions, PROPERTY_VERSION); + } + } + + /** Default implementation of {@link WorkflowSchedulerFactory.Context}. */ + @Internal + public static class DefaultWorkflowSchedulerContext + implements WorkflowSchedulerFactory.Context { + + private final ReadableConfig configuration; + private final Map<String, String> workflowSchedulerConfig; + + public DefaultWorkflowSchedulerContext( + ReadableConfig configuration, Map<String, String> workflowSchedulerConfig) { + this.configuration = configuration; + this.workflowSchedulerConfig = workflowSchedulerConfig; + } + + @Override + public ReadableConfig getConfiguration() { + return configuration; + } + + @Override + public Map<String, String> getWorkflowSchedulerOptions() { + return workflowSchedulerConfig; + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java new file mode 100644 index 00000000000..0ca0ebc37dc --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/CreateRefreshWorkflow.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogMaterializedTable; + +/** + * {@link CreateRefreshWorkflow} provides the related information to create refresh workflow of + * {@link CatalogMaterializedTable}. + */ +@PublicEvolving +public interface CreateRefreshWorkflow extends RefreshWorkflow {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java new file mode 100644 index 00000000000..44508b71e7d --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/DeleteRefreshWorkflow.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.refresh.RefreshHandler; + +/** + * {@link DeleteRefreshWorkflow} provides the related information to delete refresh workflow of + * {@link CatalogMaterializedTable}. + * + * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to + * locate the refresh workflow in scheduler service. + */ +@PublicEvolving +public class DeleteRefreshWorkflow<T extends RefreshHandler> implements RefreshWorkflow { + + private final T refreshHandler; + + public DeleteRefreshWorkflow(T refreshHandler) { + this.refreshHandler = refreshHandler; + } + + /** + * Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides + * meta info to points to the refresh workflow in scheduler service. + */ + public T getRefreshHandler() { + return refreshHandler; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java new file mode 100644 index 00000000000..f4e87ad6075 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/ModifyRefreshWorkflow.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.refresh.RefreshHandler; + +/** + * {@link ModifyRefreshWorkflow} provides the related information to modify refresh workflow of + * {@link CatalogMaterializedTable}. + * + * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to + * locate the refresh workflow in scheduler service. + */ +@PublicEvolving +public interface ModifyRefreshWorkflow<T extends RefreshHandler> extends RefreshWorkflow { + + /** + * Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides + * meta info to points to the refresh workflow in scheduler service. + */ + T getRefreshHandler(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java new file mode 100644 index 00000000000..0dd839f4b68 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/RefreshWorkflow.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogMaterializedTable; + +/** + * {@link RefreshWorkflow} is the basic interface that provide the related information to operate + * the refresh workflow of {@link CatalogMaterializedTable}, the operation of refresh workflow + * include create, modify, drop, etc. + * + * @see CreateRefreshWorkflow + * @see ModifyRefreshWorkflow + * @see DeleteRefreshWorkflow + */ +@PublicEvolving +public interface RefreshWorkflow {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java new file mode 100644 index 00000000000..5155d5265ec --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A workflow-related operation exception to materialized table, including create, suspend, resume, + * drop workflow operation, etc. + */ +@PublicEvolving +public class WorkflowException extends Exception { + + public WorkflowException(String message) { + super(message); + } + + public WorkflowException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java new file mode 100644 index 00000000000..ce630a3241d --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/workflow/WorkflowScheduler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.workflow; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.refresh.RefreshHandler; +import org.apache.flink.table.refresh.RefreshHandlerSerializer; + +/** + * This interface is used to interact with specific workflow scheduler services that support + * creating, modifying, and deleting refreshed workflow of Materialized Table. + * + * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to + * locate the refresh workflow in scheduler service. + */ +@PublicEvolving +public interface WorkflowScheduler<T extends RefreshHandler> { + + /** + * Open this workflow scheduler instance. Used for any required preparation in initialization + * phase. + * + * @throws WorkflowException if initializing workflow scheduler occur exception + */ + void open() throws WorkflowException; + + /** + * Close this workflow scheduler when it is no longer needed and release any resource that it + * might be holding. + * + * @throws WorkflowException if closing the related resources of workflow scheduler failed + */ + void close() throws WorkflowException; + + /** + * Return a {@link RefreshHandlerSerializer} instance to serialize and deserialize {@link + * RefreshHandler} created by specific workflow scheduler service. + */ + RefreshHandlerSerializer<T> getRefreshHandlerSerializer(); + + /** + * Create a refresh workflow in specific scheduler service for the materialized table, return a + * {@link RefreshHandler} instance which can locate the refresh workflow detail information. + * + * <p>This method supports creating workflow for periodic refresh, as well as workflow for a + * one-time refresh only. + * + * @param createRefreshWorkflow The detail info for create refresh workflow of materialized + * table. + * @return The meta info which points to the refresh workflow in scheduler service. + * @throws WorkflowException if creating refresh workflow failed + */ + T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException; + + /** + * Modify the refresh workflow status in scheduler service. This includes suspend, resume, + * modify schedule cron operation, and so on. + * + * @param modifyRefreshWorkflow The detail info for modify refresh workflow of materialized + * table. + * @throws WorkflowException if modify refresh workflow failed + */ + void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) + throws WorkflowException; + + /** + * Delete the refresh workflow in scheduler service. + * + * @param deleteRefreshWorkflow The detail info for delete refresh workflow of materialized + * table. + * @throws WorkflowException if delete refresh workflow failed + */ + void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) + throws WorkflowException; +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java new file mode 100644 index 00000000000..ae550947853 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/TestWorkflowSchedulerFactory.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.workflow; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.WorkflowSchedulerFactory; +import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil; +import org.apache.flink.table.refresh.RefreshHandler; +import org.apache.flink.table.refresh.RefreshHandlerSerializer; +import org.apache.flink.table.workflow.CreateRefreshWorkflow; +import org.apache.flink.table.workflow.DeleteRefreshWorkflow; +import org.apache.flink.table.workflow.ModifyRefreshWorkflow; +import org.apache.flink.table.workflow.WorkflowException; +import org.apache.flink.table.workflow.WorkflowScheduler; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** This class is an implementation of {@link WorkflowSchedulerFactory} for testing purposes. */ +public class TestWorkflowSchedulerFactory implements WorkflowSchedulerFactory { + + public static final String IDENTIFIER = "test"; + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("user-name").stringType().noDefaultValue(); + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password").stringType().noDefaultValue(); + public static final ConfigOption<String> PROJECT_NAME = + ConfigOptions.key("project-name").stringType().noDefaultValue(); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(USERNAME); + options.add(PASSWORD); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(PROJECT_NAME); + return options; + } + + @Override + public WorkflowScheduler<?> createWorkflowScheduler(Context context) { + WorkflowSchedulerFactoryUtil.WorkflowSchedulerFactoryHelper helper = + WorkflowSchedulerFactoryUtil.createWorkflowSchedulerFactoryHelper(this, context); + helper.validate(); + + return new TestWorkflowScheduler( + helper.getOptions().get(USERNAME), + helper.getOptions().get(PASSWORD), + helper.getOptions().get(PROJECT_NAME)); + } + + /** Test workflow scheduler for discovery testing. */ + public static class TestWorkflowScheduler implements WorkflowScheduler<TestRefreshHandler> { + + private final String userName; + private final String password; + private final String projectName; + + public TestWorkflowScheduler(String userName, String password, String projectName) { + this.userName = userName; + this.password = password; + this.projectName = projectName; + } + + @Override + public void open() throws WorkflowException {} + + @Override + public void close() throws WorkflowException {} + + @Override + public RefreshHandlerSerializer<TestRefreshHandler> getRefreshHandlerSerializer() { + return TestRefreshHandlerSerializer.INSTANCE; + } + + @Override + public TestRefreshHandler createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) + throws WorkflowException { + return TestRefreshHandler.INSTANCE; + } + + @Override + public void modifyRefreshWorkflow( + ModifyRefreshWorkflow<TestRefreshHandler> modifyRefreshWorkflow) + throws WorkflowException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteRefreshWorkflow( + DeleteRefreshWorkflow<TestRefreshHandler> deleteRefreshWorkflow) + throws WorkflowException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestWorkflowScheduler that = (TestWorkflowScheduler) o; + return Objects.equals(userName, that.userName) + && Objects.equals(password, that.password) + && Objects.equals(projectName, that.projectName); + } + + @Override + public int hashCode() { + return Objects.hash(userName, password, projectName); + } + } + + /** Test refresh handler for discovery testing. */ + public static class TestRefreshHandler implements RefreshHandler { + + public static final TestRefreshHandler INSTANCE = new TestRefreshHandler(); + + @Override + public String asSummaryString() { + return "Test RefreshHandler"; + } + } + + /** Test refresh handler serializer for discovery testing. */ + public static class TestRefreshHandlerSerializer + implements RefreshHandlerSerializer<TestRefreshHandler> { + + public static final TestRefreshHandlerSerializer INSTANCE = + new TestRefreshHandlerSerializer(); + + @Override + public byte[] serialize(TestRefreshHandler refreshHandler) throws IOException { + return new byte[0]; + } + + @Override + public TestRefreshHandler deserialize(byte[] serializedBytes, ClassLoader cl) + throws IOException, ClassNotFoundException { + return TestRefreshHandler.INSTANCE; + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java new file mode 100644 index 00000000000..d61ff5f04e2 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/workflow/WorkflowSchedulerFactoryUtilTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.workflow; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.WorkflowSchedulerFactory; +import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil; +import org.apache.flink.table.workflow.WorkflowScheduler; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.createWorkflowScheduler; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WorkflowSchedulerFactoryUtil}. */ +public class WorkflowSchedulerFactoryUtilTest { + + @Test + void testCreateWorkflowScheduler() { + final Map<String, String> options = getDefaultConfig(); + WorkflowScheduler<?> actual = + createWorkflowScheduler( + Configuration.fromMap(options), + Thread.currentThread().getContextClassLoader()); + + WorkflowScheduler<?> expected = + new TestWorkflowSchedulerFactory.TestWorkflowScheduler("user1", "9999", "project1"); + + assertThat(actual).isEqualTo(expected); + } + + @Test + void testCreateWorkflowSchedulerWithoutType() { + WorkflowScheduler<?> actual = + createWorkflowScheduler( + new Configuration(), Thread.currentThread().getContextClassLoader()); + + assertThat(actual).isNull(); + } + + @Test + void testCreateWorkflowSchedulerWithUnknownType() { + final Map<String, String> options = getDefaultConfig(); + options.put("workflow-scheduler.type", "unknown"); + + validateException( + options, + String.format( + "Could not find any factory for identifier 'unknown' " + + "that implements '%s' in the classpath.", + WorkflowSchedulerFactory.class.getCanonicalName())); + } + + @Test + void testCreateWorkflowSchedulerWithMissingOptions() { + final Map<String, String> options = getDefaultConfig(); + options.remove("workflow-scheduler.test.user-name"); + + validateException( + options, + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "user-name"); + } + + // -------------------------------------------------------------------------------------------- + + private void validateException(Map<String, String> options, String errorMessage) { + assertThatThrownBy( + () -> + createWorkflowScheduler( + Configuration.fromMap(options), + Thread.currentThread().getContextClassLoader())) + .satisfies(anyCauseMatches(ValidationException.class, errorMessage)); + } + + private Map<String, String> getDefaultConfig() { + Map<String, String> config = new HashMap<>(); + config.put("workflow-scheduler.type", "test"); + config.put("workflow-scheduler.test.user-name", "user1"); + config.put("workflow-scheduler.test.password", "9999"); + config.put("workflow-scheduler.test.project-name", "project1"); + return config; + } +} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index 22b140e3e7a..cad6079cdc2 100644 --- a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -25,3 +25,4 @@ org.apache.flink.table.factories.TestCatalogFactory org.apache.flink.table.factories.TestCatalogStoreFactory org.apache.flink.table.factories.TestManagedTableFactory org.apache.flink.table.factories.module.DummyModuleFactory +org.apache.flink.table.factories.workflow.TestWorkflowSchedulerFactory