This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git
commit e956104c2e097891e03f62092844c687d984ebe0 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Fri Feb 4 19:07:22 2022 +0100 [FLINK-25935] Add LocalEnvironmentEntrypoint The LocalEnvironmentEntrypoint runs a Stateful Functions applications within a single process. It uses Flink's LocalStreamExecutionEnvironment for this. The Entrypoint can be configured via * --module <PATH_TO_MODULE> * --set <CONFIG_OPTION>=<CONFIG_VALUE> --- .../statefun-playground-entrypoint/Dockerfile | 26 ++++ .../statefun-playground-entrypoint/README.md | 12 ++ .../statefun-playground-entrypoint/pom.xml | 145 +++++++++++++++++++++ .../entrypoint/ClassPathUniverseProvider.java | 40 ++++++ .../entrypoint/ConfigurationValidator.java | 85 ++++++++++++ .../entrypoint/LocalEnvironmentEntrypoint.java | 101 ++++++++++++++ .../src/main/resources/log4j.properties | 20 +++ 7 files changed, 429 insertions(+) diff --git a/playground-internal/statefun-playground-entrypoint/Dockerfile b/playground-internal/statefun-playground-entrypoint/Dockerfile new file mode 100644 index 0000000..0438cc9 --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/Dockerfile @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build the functions code ... +FROM maven:3.6.3-jdk-11 AS builder +COPY src /usr/src/app/src +COPY pom.xml /usr/src/app +RUN mvn -f /usr/src/app/pom.xml clean package + +# ... and run the web server! +FROM openjdk:11 +WORKDIR / +COPY --from=builder /usr/src/app/target/statefun-playground-entrypoint*.jar statefun-playground-entrypoint.jar +ENTRYPOINT ["java", "-jar", "statefun-playground-entrypoint.jar"] diff --git a/playground-internal/statefun-playground-entrypoint/README.md b/playground-internal/statefun-playground-entrypoint/README.md new file mode 100644 index 0000000..07d1302 --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/README.md @@ -0,0 +1,12 @@ +# Stateful Functions Playground Entrypoint + +A simple Stateful Functions entrypoint that runs Stateful Functions within a single process. + +## Configuring a module.yaml + +Per default the `LocalEnvironmentEntrypoint` expects a `module.yaml` to be on the classpath. +Alternatively, one can provide a different location via `--module file://<PATH>`. + +## Configuring the Flink Runtime + +One can configure the underlying Flink runtime via `--set <CONFIG_OPTION>=<CONFIG_VALUE>`. \ No newline at end of file diff --git a/playground-internal/statefun-playground-entrypoint/pom.xml b/playground-internal/statefun-playground-entrypoint/pom.xml new file mode 100644 index 0000000..77cd8ca --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.flink</groupId> + <artifactId>statefun-playground-entrypoint</artifactId> + <version>3.2.0</version> + <packaging>jar</packaging> + + <properties> + <statefun.version>3.2.0</statefun.version> + <flink.version>1.14.3</flink.version> + <slf4j.version>1.7.35</slf4j.version> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + </properties> + + <dependencies> + <!-- StateFun Core --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-flink-core</artifactId> + <version>${statefun.version}</version> + </dependency> + <!-- StateFun Distribution --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-flink-io-bundle</artifactId> + <version>${statefun.version}</version> + </dependency> + + <!-- Flink dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_2.12</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Build a fat executable jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.4</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.statefun.playground.internal.entrypoint.LocalEnvironmentEntrypoint</mainClass> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Java code style --> + <plugin> + <groupId>com.diffplug.spotless</groupId> + <artifactId>spotless-maven-plugin</artifactId> + <version>1.20.0</version> + <configuration> + <java> + <googleJavaFormat> + <version>1.7</version> + <style>GOOGLE</style> + </googleJavaFormat> + <removeUnusedImports/> + </java> + </configuration> + <executions> + <execution> + <id>spotless-check</id> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java new file mode 100644 index 0000000..dfd48a9 --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.playground.internal.entrypoint; + +import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; +import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; +import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider; +import org.apache.flink.statefun.flink.core.spi.Modules; + +/** + * This class duplicates StatefulFunctionsUniverses.ClassPathUniverseProvider because it is not + * public. Needs a new Statefun release to change the visibility. @Todo Update Statefun to expose + * this functionality + */ +final class ClassPathUniverseProvider implements StatefulFunctionsUniverseProvider { + + private static final long serialVersionUID = 1; + + @Override + public StatefulFunctionsUniverse get( + ClassLoader classLoader, StatefulFunctionsConfig configuration) { + Modules modules = Modules.loadFromClassPath(configuration); + return modules.createStatefulFunctionsUniverse(); + } +} diff --git a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java new file mode 100644 index 0000000..133c8cb --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.playground.internal.entrypoint; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; +import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; +import org.apache.flink.statefun.flink.core.message.MessageFactoryType; +import org.apache.flink.util.StringUtils; + +public final class ConfigurationValidator { + + private ConfigurationValidator() {} + + static void validate(Configuration configuration) { + validateCustomPayloadSerializerClassName(configuration); + validateNoHeapBackedTimers(configuration); + validateUnalignedCheckpointsDisabled(configuration); + } + + private static void validateCustomPayloadSerializerClassName(Configuration configuration) { + + MessageFactoryType factoryType = + configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER); + String customPayloadSerializerClassName = + configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS); + + if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) { + if (StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) { + throw new StatefulFunctionsInvalidConfigException( + StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS, + "custom payload serializer class must be supplied with WITH_CUSTOM_PAYLOADS serializer"); + } + } else { + if (customPayloadSerializerClassName != null) { + throw new StatefulFunctionsInvalidConfigException( + StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS, + "custom payload serializer class may only be supplied with WITH_CUSTOM_PAYLOADS serializer"); + } + } + } + + private static final ConfigOption<String> TIMER_SERVICE_FACTORY = + ConfigOptions.key("state.backend.rocksdb.timer-service.factory") + .stringType() + .defaultValue("rocksdb"); + + private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS = + ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false); + + private static void validateNoHeapBackedTimers(Configuration configuration) { + final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY); + if (!timerFactory.equalsIgnoreCase("rocksdb")) { + throw new StatefulFunctionsInvalidConfigException( + TIMER_SERVICE_FACTORY, + "StateFun only supports non-heap timers with a rocksdb state backend."); + } + } + + private static void validateUnalignedCheckpointsDisabled(Configuration configuration) { + final boolean unalignedCheckpoints = configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS); + if (unalignedCheckpoints) { + throw new StatefulFunctionsInvalidConfigException( + ENABLE_UNALIGNED_CHECKPOINTS, + "StateFun currently does not support unaligned checkpointing."); + } + } +} diff --git a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java new file mode 100644 index 0000000..fb43e85 --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.playground.internal.entrypoint; + +import java.util.Collection; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; +import org.apache.flink.statefun.flink.core.StatefulFunctionsJob; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Entrypoint that starts a local Flink environment to run the given Stateful Functions application + * in this process. + */ +public final class LocalEnvironmentEntrypoint { + private static final Logger LOG = LoggerFactory.getLogger(LocalEnvironmentEntrypoint.class); + + private static final String MODULE_OPTION = "module"; + private static final String CONFIGURATION_OPTION = "set"; + + public static void main(String[] args) throws Exception { + final Configuration flinkConfiguration = parseConfiguration(args); + ConfigurationValidator.validate(flinkConfiguration); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConfiguration); + + final StatefulFunctionsConfig stateFunConfig = + StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfiguration); + stateFunConfig.setProvider(new ClassPathUniverseProvider()); + + StatefulFunctionsJob.main(env, stateFunConfig); + } + + private static Configuration parseConfiguration(String[] args) { + final MultipleParameterTool parameterTool = MultipleParameterTool.fromArgs(args); + final Configuration flinkConfiguration = createDefaultLocalEnvironmentFlinkConfiguration(); + parseModuleOption(parameterTool, flinkConfiguration); + parseConfigurationOptions(parameterTool, flinkConfiguration); + + return flinkConfiguration; + } + + private static void parseConfigurationOptions( + MultipleParameterTool parameterTool, Configuration flinkConfiguration) { + final Collection<String> configurationOptions = + parameterTool.getMultiParameter(CONFIGURATION_OPTION); + + if (configurationOptions != null) { + for (String configurationOption : configurationOptions) { + final String[] splits = configurationOption.split("="); + + if (splits.length != 2) { + throw new IllegalArgumentException( + String.format("The '--%s' value must have the form 'key=value'", CONFIGURATION_OPTION)); + } + + final String key = splits[0]; + final String value = splits[1]; + LOG.info("Setting configuration value: {}={}", key, value); + flinkConfiguration.setString(key, value); + } + } + } + + private static void parseModuleOption( + MultipleParameterTool parameterTool, Configuration flinkConfiguration) { + final String module = parameterTool.get(MODULE_OPTION, "file:///module.yaml"); + + LOG.info("Setting module.yaml to: {}", module); + flinkConfiguration.set(StatefulFunctionsConfig.REMOTE_MODULE_NAME, module); + } + + private static Configuration createDefaultLocalEnvironmentFlinkConfiguration() { + final Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); + + return flinkConfiguration; + } +} diff --git a/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties b/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties new file mode 100644 index 0000000..8d9b95d --- /dev/null +++ b/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n