[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124952#comment-16124952 ]
ASF GitHub Bot commented on FLINK-5886: --------------------------------------- Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132843982 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * <p>The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * </p> + * + * <p>The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access).</p> + */ +@PublicEvolving +public class PythonStreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + private final StreamExecutionEnvironment env; + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes + * care for required Jython serializers registration. + * + * @return The python execution environment of the context in which the program is + * executed. + */ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + /** + * Creates a {@link LocalStreamEnvironment}. The local execution environment + * will run the program in a multi-threaded fashion in the same JVM as the + * environment was created in. The default parallelism of the local + * environment is the number of hardware contexts (CPU cores / threads), + * unless it was specified differently by {@link #setParallelism(int)}. + * + * @param configuration + * Pass a custom configuration into the cluster + * @return A local execution environment with the specified parallelism. + */ + public static PythonStreamExecutionEnvironment create_local_execution_environment(Configuration config) { + return new PythonStreamExecutionEnvironment(new LocalStreamEnvironment(config)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createLocalEnvironment(int, Configuration)} + * + * @param parallelism + * The parallelism for the local environment. + * @param config + * Pass a custom configuration into the cluster + * @return A local python execution environment with the specified parallelism. + */ + public static PythonStreamExecutionEnvironment create_local_execution_environment(int parallelism, Configuration config) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createLocalEnvironment(parallelism, config)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment(java.lang.String, int, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( + * java.lang.String, int, Configuration, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param config + * The configuration used by the client that connects to the remote cluster. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + * + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, Configuration config, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files)); + } + + /** + * A thin wrapper layer over {@link StreamExecutionEnvironment#createRemoteEnvironment( + * java.lang.String, int, int, java.lang.String...)} + * + * @param host + * The host name or address of the master (JobManager), where the + * program should be executed. + * @param port + * The port of the master (JobManager), where the program should + * be executed. + * @param parallelism + * The parallelism to use during the execution. + * @param jar_files + * The JAR files with code that needs to be shipped to the + * cluster. If the program uses user-defined functions, + * user-defined input formats, or any libraries, those must be + * provided in the JAR files. + * @return A remote environment that executes the program on a cluster. + */ + public static PythonStreamExecutionEnvironment create_remote_execution_environment( + String host, int port, int parallelism, String... jar_files) { + return new PythonStreamExecutionEnvironment( + StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files)); + } + + private PythonStreamExecutionEnvironment(StreamExecutionEnvironment env) { + this.env = env; + this.registerJythonSerializers(); + } + + private void registerJythonSerializers() { + this.env.registerTypeWithKryoSerializer(PyString.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyInteger.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyLong.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyUnicode.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyTuple.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyObjectDerived.class, PyObjectSerializer.class); + this.env.registerTypeWithKryoSerializer(PyInstance.class, PyObjectSerializer.class); + } + + public PythonDataStream create_python_source(SourceFunction<Object> src) throws Exception { --- End diff -- Probably yes. All Jython user-defined functions/Classes are generated under `org.python.proxies` scope. So, using the class canonical name, we can identify these functions. WDYT? > Python API for streaming applications > ------------------------------------- > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API > Reporter: Zohar Mizrahi > Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)