This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun-playground.git

commit e956104c2e097891e03f62092844c687d984ebe0
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Feb 4 19:07:22 2022 +0100

    [FLINK-25935] Add LocalEnvironmentEntrypoint
    
    The LocalEnvironmentEntrypoint runs a Stateful Functions applications 
within a
    single process. It uses Flink's LocalStreamExecutionEnvironment for this. 
The
    Entrypoint can be configured via
    
    * --module <PATH_TO_MODULE>
    * --set <CONFIG_OPTION>=<CONFIG_VALUE>
---
 .../statefun-playground-entrypoint/Dockerfile      |  26 ++++
 .../statefun-playground-entrypoint/README.md       |  12 ++
 .../statefun-playground-entrypoint/pom.xml         | 145 +++++++++++++++++++++
 .../entrypoint/ClassPathUniverseProvider.java      |  40 ++++++
 .../entrypoint/ConfigurationValidator.java         |  85 ++++++++++++
 .../entrypoint/LocalEnvironmentEntrypoint.java     | 101 ++++++++++++++
 .../src/main/resources/log4j.properties            |  20 +++
 7 files changed, 429 insertions(+)

diff --git a/playground-internal/statefun-playground-entrypoint/Dockerfile 
b/playground-internal/statefun-playground-entrypoint/Dockerfile
new file mode 100644
index 0000000..0438cc9
--- /dev/null
+++ b/playground-internal/statefun-playground-entrypoint/Dockerfile
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Build the functions code ...
+FROM maven:3.6.3-jdk-11 AS builder
+COPY src /usr/src/app/src
+COPY pom.xml /usr/src/app
+RUN mvn -f /usr/src/app/pom.xml clean package
+
+# ... and run the web server!
+FROM openjdk:11
+WORKDIR /
+COPY --from=builder /usr/src/app/target/statefun-playground-entrypoint*.jar 
statefun-playground-entrypoint.jar
+ENTRYPOINT ["java", "-jar", "statefun-playground-entrypoint.jar"]
diff --git a/playground-internal/statefun-playground-entrypoint/README.md 
b/playground-internal/statefun-playground-entrypoint/README.md
new file mode 100644
index 0000000..07d1302
--- /dev/null
+++ b/playground-internal/statefun-playground-entrypoint/README.md
@@ -0,0 +1,12 @@
+# Stateful Functions Playground Entrypoint
+
+A simple Stateful Functions entrypoint that runs Stateful Functions within a 
single process.
+
+## Configuring a module.yaml
+
+Per default the `LocalEnvironmentEntrypoint` expects a `module.yaml` to be on 
the classpath.
+Alternatively, one can provide a different location via `--module 
file://<PATH>`.
+
+## Configuring the Flink Runtime
+
+One can configure the underlying Flink runtime via `--set 
<CONFIG_OPTION>=<CONFIG_VALUE>`.
\ No newline at end of file
diff --git a/playground-internal/statefun-playground-entrypoint/pom.xml 
b/playground-internal/statefun-playground-entrypoint/pom.xml
new file mode 100644
index 0000000..77cd8ca
--- /dev/null
+++ b/playground-internal/statefun-playground-entrypoint/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>statefun-playground-entrypoint</artifactId>
+    <version>3.2.0</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <statefun.version>3.2.0</statefun.version>
+        <flink.version>1.14.3</flink.version>
+        <slf4j.version>1.7.35</slf4j.version>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <!-- StateFun Core -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-core</artifactId>
+            <version>${statefun.version}</version>
+        </dependency>
+        <!-- StateFun Distribution -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-io-bundle</artifactId>
+            <version>${statefun.version}</version>
+        </dependency>
+
+        <!-- Flink dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.12</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_2.12</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_2.12</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Build a fat executable jar -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.flink.statefun.playground.internal.entrypoint.LocalEnvironmentEntrypoint</mainClass>
+                                </transformer>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- Java code style -->
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>1.20.0</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.7</version>
+                            <style>GOOGLE</style>
+                        </googleJavaFormat>
+                        <removeUnusedImports/>
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>spotless-check</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java
new file mode 100644
index 0000000..dfd48a9
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ClassPathUniverseProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.playground.internal.entrypoint;
+
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
+import org.apache.flink.statefun.flink.core.spi.Modules;
+
+/**
+ * This class duplicates StatefulFunctionsUniverses.ClassPathUniverseProvider 
because it is not
+ * public. Needs a new Statefun release to change the visibility. @Todo Update 
Statefun to expose
+ * this functionality
+ */
+final class ClassPathUniverseProvider implements 
StatefulFunctionsUniverseProvider {
+
+  private static final long serialVersionUID = 1;
+
+  @Override
+  public StatefulFunctionsUniverse get(
+      ClassLoader classLoader, StatefulFunctionsConfig configuration) {
+    Modules modules = Modules.loadFromClassPath(configuration);
+    return modules.createStatefulFunctionsUniverse();
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java
new file mode 100644
index 0000000..133c8cb
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/ConfigurationValidator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.playground.internal.entrypoint;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
+import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import org.apache.flink.util.StringUtils;
+
+public final class ConfigurationValidator {
+
+  private ConfigurationValidator() {}
+
+  static void validate(Configuration configuration) {
+    validateCustomPayloadSerializerClassName(configuration);
+    validateNoHeapBackedTimers(configuration);
+    validateUnalignedCheckpointsDisabled(configuration);
+  }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+    MessageFactoryType factoryType =
+        configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+    String customPayloadSerializerClassName =
+        
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+    if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+      if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
+        throw new StatefulFunctionsInvalidConfigException(
+            
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS,
+            "custom payload serializer class must be supplied with 
WITH_CUSTOM_PAYLOADS serializer");
+      }
+    } else {
+      if (customPayloadSerializerClassName != null) {
+        throw new StatefulFunctionsInvalidConfigException(
+            
StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS,
+            "custom payload serializer class may only be supplied with 
WITH_CUSTOM_PAYLOADS serializer");
+      }
+    }
+  }
+
+  private static final ConfigOption<String> TIMER_SERVICE_FACTORY =
+      ConfigOptions.key("state.backend.rocksdb.timer-service.factory")
+          .stringType()
+          .defaultValue("rocksdb");
+
+  private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS =
+      
ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false);
+
+  private static void validateNoHeapBackedTimers(Configuration configuration) {
+    final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY);
+    if (!timerFactory.equalsIgnoreCase("rocksdb")) {
+      throw new StatefulFunctionsInvalidConfigException(
+          TIMER_SERVICE_FACTORY,
+          "StateFun only supports non-heap timers with a rocksdb state 
backend.");
+    }
+  }
+
+  private static void validateUnalignedCheckpointsDisabled(Configuration 
configuration) {
+    final boolean unalignedCheckpoints = 
configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS);
+    if (unalignedCheckpoints) {
+      throw new StatefulFunctionsInvalidConfigException(
+          ENABLE_UNALIGNED_CHECKPOINTS,
+          "StateFun currently does not support unaligned checkpointing.");
+    }
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
new file mode 100644
index 0000000..fb43e85
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/entrypoint/LocalEnvironmentEntrypoint.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.playground.internal.entrypoint;
+
+import java.util.Collection;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entrypoint that starts a local Flink environment to run the given Stateful 
Functions application
+ * in this process.
+ */
+public final class LocalEnvironmentEntrypoint {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalEnvironmentEntrypoint.class);
+
+  private static final String MODULE_OPTION = "module";
+  private static final String CONFIGURATION_OPTION = "set";
+
+  public static void main(String[] args) throws Exception {
+    final Configuration flinkConfiguration = parseConfiguration(args);
+    ConfigurationValidator.validate(flinkConfiguration);
+
+    final StreamExecutionEnvironment env =
+        
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConfiguration);
+
+    final StatefulFunctionsConfig stateFunConfig =
+        StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfiguration);
+    stateFunConfig.setProvider(new ClassPathUniverseProvider());
+
+    StatefulFunctionsJob.main(env, stateFunConfig);
+  }
+
+  private static Configuration parseConfiguration(String[] args) {
+    final MultipleParameterTool parameterTool = 
MultipleParameterTool.fromArgs(args);
+    final Configuration flinkConfiguration = 
createDefaultLocalEnvironmentFlinkConfiguration();
+    parseModuleOption(parameterTool, flinkConfiguration);
+    parseConfigurationOptions(parameterTool, flinkConfiguration);
+
+    return flinkConfiguration;
+  }
+
+  private static void parseConfigurationOptions(
+      MultipleParameterTool parameterTool, Configuration flinkConfiguration) {
+    final Collection<String> configurationOptions =
+        parameterTool.getMultiParameter(CONFIGURATION_OPTION);
+
+    if (configurationOptions != null) {
+      for (String configurationOption : configurationOptions) {
+        final String[] splits = configurationOption.split("=");
+
+        if (splits.length != 2) {
+          throw new IllegalArgumentException(
+              String.format("The '--%s' value must have the form 'key=value'", 
CONFIGURATION_OPTION));
+        }
+
+        final String key = splits[0];
+        final String value = splits[1];
+        LOG.info("Setting configuration value: {}={}", key, value);
+        flinkConfiguration.setString(key, value);
+      }
+    }
+  }
+
+  private static void parseModuleOption(
+      MultipleParameterTool parameterTool, Configuration flinkConfiguration) {
+    final String module = parameterTool.get(MODULE_OPTION, 
"file:///module.yaml");
+
+    LOG.info("Setting module.yaml to: {}", module);
+    flinkConfiguration.set(StatefulFunctionsConfig.REMOTE_MODULE_NAME, module);
+  }
+
+  private static Configuration 
createDefaultLocalEnvironmentFlinkConfiguration() {
+    final Configuration flinkConfiguration = new Configuration();
+    flinkConfiguration.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+    flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+
+    return flinkConfiguration;
+  }
+}
diff --git 
a/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties
 
b/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8d9b95d
--- /dev/null
+++ 
b/playground-internal/statefun-playground-entrypoint/src/main/resources/log4j.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

Reply via email to