[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/5333 Just started to look at your changes and have one comment with respect to the plan.py - have you tried executing the same script twice, but on the second time change one line in the script (.e.g a map function)? Make sure the change takes place for the second run. (BTW - I'm not sure I'll be able to spend much time in the near future in reviewing the whole changes) ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 Referring to the issue with the ```PythonEnvironmentConfig ``` above, Is there any other global indication that I can use to test whether a given function is executed on the TaskManager? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 One of the critical attributes is ```PythonEnvironmentConfig::pythonTmpCachePath```, which is used in the following places: - ```PythonStreamExecutionEnvironment::execute:362``` - ```PythonStreamExecutionEnvironment::execute:400``` - ```PythonStreamBinder::prepareFiles:117``` On the client side, the temporary files are prepared for distribution by the ```PythonStreamBinder``` and then processed by the ``PythonStreamExecutionEnvironment::execute``` function, which is called from the Python script. When the python script is executed on the TaskManager, this attribute remains ```null``` and thus, the ```execute``` returns immediately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 The thing is that I use the ```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver information from the ```PythonStreamBinder``` to a class that is called from the python script. How would you suggest to do it otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 I'm trying to track down the root cause for the checks failures without a success. Obviously, the given project (flink-libraries/flink-streaming-python) in master branch passes the `verify` with success in my environment. Please advise, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 Regarding the exception - ```java.io.IOException: java.io.IOException: The given HDFS file URI ...``` In general, using the python interface requires a valid configuration of shared file system (.e.g HDFS), which designed to distribute the python files. Someone can bypass this issue by set the second argument to 'True' when calling to ```env.execute(...)``` in the python script. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133392462 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * A serializer implementation for PyObject class type. Is is used by the Kryo serialization + * framework. {@see https://github.com/EsotericSoftware/kryo#serializers} + */ +public class PyObjectSerializer extends Serializer { + + public void write (Kryo kryo, Output output, PyObject po) { + try { + byte[] serPo = SerializationUtils.serializeObject(po); + output.writeInt(serPo.length); + output.write(serPo); + } catch (IOException e) { + e.printStackTrace(); --- End diff -- My bad - I missed the java unchecked exceptions part (.e.g runtime exception). It'll be much better to use it here. As for the `read`, we can either return `null` or again use the runtime exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133192783 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/PythonCollector.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util; + +import org.apache.flink.annotation.Public; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.util.Collector; +import org.python.core.PyObject; + +/** + * Collects a {@code PyObject} record and forwards it. It makes sure that the record is converted, + * if necessary, to a {@code PyObject}. + */ +@Public +public class PythonCollector implements Collector { + private Collector collector; + + public void setCollector(Collector collector) { + this.collector = collector; + } + + @Override + public void collect(PyObject record) { + PyObject po = UtilityFunctions.adapt(record); --- End diff -- Actually you're right. This line can be dropped. And now, it seems that the `PythonCollector` is redundant, though it provides a more safety layer to report users about casting problems in case they provide UDF from the Java source code. If their function would not handle PyObject that it will break in runtime, saying something like and object cannot be cast to 'PyObject'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133170150 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonOutputSelector} is a thin wrapper layer over a Python UDF {@code OutputSelector}. + * It receives an {@code OutputSelector} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code OutputSelector}. + * + * This function is used internally by the Python thin wrapper layer over the streaming data + * functionality + */ +public class PythonOutputSelector implements OutputSelector { + private static final long serialVersionUID = 909266346633598177L; + + private final byte[] serFun; + private transient OutputSelector fun; + + public PythonOutputSelector(OutputSelector fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public Iterable select(PyObject value) { + if (this.fun == null) { + try { + this.fun = (OutputSelector) SerializationUtils.deserializeObject(this.serFun); + } catch (IOException e) { + e.printStackTrace(); --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133169625 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * A serializer implementation for PyObject class type. Is is used by the Kryo serialization + * framework. {@see https://github.com/EsotericSoftware/kryo#serializers} + */ +public class PyObjectSerializer extends Serializer { + + public void write (Kryo kryo, Output output, PyObject po) { + try { + byte[] serPo = SerializationUtils.serializeObject(po); + output.writeInt(serPo.length); + output.write(serPo); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public PyObject read (Kryo kryo, Input input, Class type) { + int len = input.readInt(); + byte[] serPo = new byte[len]; + input.read(serPo); + PyObject po = null; + try { + po = (PyObject) SerializationUtils.deserializeObject(serPo); + } catch (IOException e) { + e.printStackTrace(); --- End diff -- Yes. Same as the answer above. In order to verify it, during debugging, I set the returned value to null and it continued without issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133169331 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/util/serialization/PyObjectSerializer.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.util.serialization; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * A serializer implementation for PyObject class type. Is is used by the Kryo serialization + * framework. {@see https://github.com/EsotericSoftware/kryo#serializers} + */ +public class PyObjectSerializer extends Serializer { + + public void write (Kryo kryo, Output output, PyObject po) { + try { + byte[] serPo = SerializationUtils.serializeObject(po); + output.writeInt(serPo.length); + output.write(serPo); + } catch (IOException e) { + e.printStackTrace(); --- End diff -- Yes, indeed. According to https://github.com/EsotericSoftware/kryo#serializers: "By default, serializers do not need to handle the object being null." But anyway, I added a print message to the log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133156452 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/functions/PythonOutputSelector.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.functions; + +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.python.util.serialization.SerializationUtils; +import org.python.core.PyObject; + +import java.io.IOException; + +/** + * The {@code PythonOutputSelector} is a thin wrapper layer over a Python UDF {@code OutputSelector}. + * It receives an {@code OutputSelector} as an input and keeps it internally in a serialized form. + * It is then delivered, as part of the job graph, up to the TaskManager, then it is opened and becomes + * a sort of mediator to the Python UDF {@code OutputSelector}. + * + * This function is used internally by the Python thin wrapper layer over the streaming data + * functionality + */ +public class PythonOutputSelector implements OutputSelector { + private static final long serialVersionUID = 909266346633598177L; + + private final byte[] serFun; + private transient OutputSelector fun; + + public PythonOutputSelector(OutputSelector fun) throws IOException { + this.serFun = SerializationUtils.serializeObject(fun); + } + + @Override + @SuppressWarnings("unchecked") + public Iterable select(PyObject value) { + if (this.fun == null) { + try { + this.fun = (OutputSelector) SerializationUtils.deserializeObject(this.serFun); + } catch (IOException e) { + e.printStackTrace(); --- End diff -- The thing is that the signature of the `select` function in `OutputSelector` interface is without any exception throw declaration. This is why I catch the given exceptions here and add a check in the following lines to test whether the variable `this.fun` is null. If it is `null`, then the function returns null. How can I do it otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r133154903 --- Diff: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.python.api.PythonStreamBinder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class PythonStreamBinderTest extends StreamingProgramTestBase { + final private static String defaultPythonScriptName = "run_all_tests.py"; + final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; + final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; + + public PythonStreamBinderTest() { + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = prepareDefaultArgs(); + } else { + args[0] = findStreamTestFile(args[0]).getAbsolutePath(); + } + PythonStreamBinder.main(args); + } + + @Override + public void testProgram() throws Exception { + this.main(new String[]{}); + } + + private static String[] prepareDefaultArgs() throws Exception { + File testFullPath = findStreamTestFile(defaultPythonScriptName); --- End diff -- Instead I've just provided a simplified version of it. Will send for review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132967578 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.api.datastream; + +import org.python.util.PythonObjectInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectStreamClass; + +/** + * A helper class to overcome the inability to set the serialVersionUID in a python user-defined + * function (UDF). + * The fact the this field is not set, results in a dynamic calculation of this serialVersionUID, + * using SHA, to make sure it is a unique number. This unique number is a 64-bit hash of the + * class name, interface class names, methods, and fields. If a Python class inherits from a + * Java class, as in the case of Python UDFs, then a proxy wrapper class is created. Its name is + * constructed using the following pattern: + * {@code org.python.proxies.$$}. The {@code } + * part is increased by one in runtime, for every job submission. It results in different serial + * version UID for each run for the same Python class. Therefore, it is required to silently + * suppress the serial version UID mismatch check. + */ +public class PythonObjectInputStream2 extends PythonObjectInputStream { + + public PythonObjectInputStream2(InputStream in) throws IOException { + super(in); + } + + protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { + ObjectStreamClass resultClassDescriptor = super.readClassDescriptor(); // initially streams descriptor + + Class localClass; + try { + localClass = resolveClass(resultClassDescriptor); + } catch (ClassNotFoundException e) { + System.out.println("No local class for " + resultClassDescriptor.getName()); --- End diff -- The purpose of this function is to try to return the class according to the given description. If it fails, it probably means that the Jython interpreter was not initialised yet, and as a result it is initialised. This is handled in `org.apache.flink.streaming.python.api.functions.UtilityFunctions::smartFunctionDeserialization`. I'm currently checking whether the `catch` here is redundant and it's a left-over from the debugging phase. We can probably let the exception be propagated up the call stack. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132843982 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * + * + * The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access). + */ +@PublicEvolving +public class PythonStreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + private final StreamExecutionEnvironment env; + + /** +* A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes +* care for required Jython serializers registration. +* +* @return The python execution environment of the context in which the program is +* executed. +*/ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + /** +* Creates a {@link LocalStreamEnvironment}. The local execution environment +* will run the program in a multi-threaded fashion in the same JVM as the +* environment was created in. The default parallelism of the local +* environment is the number of hardware contexts (CPU cores / threads), +* unless it was specified differently by {@link #setParallelism(int)}. +* +* @param configuration +* Pass a custom configuration into the cluster +* @return A local execution environment with the specified parallelism. +
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132842489 --- Diff: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.python.api.PythonStreamBinder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class PythonStreamBinderTest extends StreamingProgramTestBase { + final private static String defaultPythonScriptName = "run_all_tests.py"; + final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; + final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; + + public PythonStreamBinderTest() { + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = prepareDefaultArgs(); + } else { + args[0] = findStreamTestFile(args[0]).getAbsolutePath(); + } + PythonStreamBinder.main(args); + } + + @Override + public void testProgram() throws Exception { + this.main(new String[]{}); + } + + private static String[] prepareDefaultArgs() throws Exception { + File testFullPath = findStreamTestFile(defaultPythonScriptName); --- End diff -- I agree that the function names here are a bit confusing - in essence this function locates a single test file, while the function in the next code line `getFilesInFolder` collects files that start with `test_`, thus the main test file `run_all_tests.py` will be filtered and not be included. So, in order to be more readable and robust, I changed the `getFilesInFolder` to receive one more argument of `excludes` and call it with the main test file in the `excludes` argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132841381 --- Diff: flink-libraries/flink-streaming-python/pom.xml --- @@ -0,0 +1,104 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-libraries + 1.4-SNAPSHOT + .. + + + flink-streaming-python_${scala.binary.version} + flink-streaming-python + jar + + + + + org.apache.maven.plugins + maven-jar-plugin + + + jar-with-dependencies + + + + true + org.apache.flink.streaming.python.api.PythonStreamBinder + + + + + + + + + + + + + org.apache.flink + flink-core + ${project.version} +provided + + + org.apache.flink + flink-java + ${project.version} +provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} +provided + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} +provided + + + org.python + jython-standalone + 2.7.0 + + + org.apache.flink + flink-connector-kafka-0.9_2.10 --- End diff -- Done for the Scala version. As for not being able to run the tests: 1. I fixed the issue with the ```TypeError: object of type 'java.lang.Class' has no len()```. 2. I still can't reproduce the main issue, concerning an import of java class from the Python module: File: ```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py``` Line:19: ```from org.apache.flink.api.java.utils import ParameterTool``` The given class (ParameterTool) resides in different project `flink-java` and the jython module cannot find it. Probably, It somehow concerns the CLASSPATH. Any suggestion for how to reproduce it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 In the last change, I've rebased locally on top of origin/master, so I did `git push -f` to the master branch in my fork. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117864592 --- Diff: flink-libraries/flink-streaming-python/pom.xml --- @@ -0,0 +1,99 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-libraries + 1.3-SNAPSHOT + .. + + + flink-streaming-python_2.10 + flink-streaming-python + jar + + + + + org.apache.maven.plugins + maven-jar-plugin + + + jar-with-dependencies + + + + true + org.apache.flink.streaming.python.api.PythonStreamBinder + + + + + + + + + + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-java + ${project.version} --- End diff -- I'm not sure what you mean. Please explain. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117807878 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -233,6 +234,20 @@ boolean holdsStillReference(String name, JobID jobId) { // Utilities // + /** +* Remove a given path recursively if exists. The path can be a filepath or directory. +* +* @param path The root path to remove. +* @throws IOException +* @throws URISyntaxException +*/ + public static void clearPath(String path) throws IOException, URISyntaxException { --- End diff -- My bad! I don't use it anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117806756 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * + * + * The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access). + */ +@Public +public class PythonStreamExecutionEnvironment { + private final StreamExecutionEnvironment env; + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + + /** +* A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes +* care for required Jython serializers registration. +* +* @return The python execution environment of the context in which the program is +* executed. +*/ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(); + } + + /** +* Creates a {@link LocalStreamEnvironment}. The local execution environment +* will run the program in a multi-threaded fashion in the same JVM as the +* environment was created in. The default parallelism of the local +* environment is the number of hardware contexts (CPU cores / threads), +* unless it was specified differently by {@link #setParallelism(int)}. +* +* @param configuration +* Pass a custom configuration into the cluster +* @return A local execution environment with the specified parallelism. +*/ + public static PythonStream
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117793843 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * + * + * The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access). + */ +@Public +public class PythonStreamExecutionEnvironment { + private final StreamExecutionEnvironment env; + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + + /** +* A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes +* care for required Jython serializers registration. +* +* @return The python execution environment of the context in which the program is +* executed. +*/ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(); + } + + /** +* Creates a {@link LocalStreamEnvironment}. The local execution environment +* will run the program in a multi-threaded fashion in the same JVM as the +* environment was created in. The default parallelism of the local +* environment is the number of hardware contexts (CPU cores / threads), +* unless it was specified differently by {@link #setParallelism(int)}. +* +* @param configuration +* Pass a custom configuration into the cluster +* @return A local execution environment with the specified parallelism. +*/ + public static PythonStream
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117788892 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonObjectInputStream2.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.python.api.datastream; + +import org.python.util.PythonObjectInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectStreamClass; + +/** + * A helper class to overcome the inability to set the serialVersionUID in a python user-defined + * function (UDF). + * The fact the this field is not set, results in a dynamic calculation of this serialVersionUID, + * using SHA, to make sure it is a unique number. This unique number is a 64-bit hash of the + * class name, interface class names, methods, and fields. If a Python class inherits from a + * Java class, as in the case of Python UDFs, then a proxy wrapper class is created. Its name is + * constructed using the following pattern: + * {@code org.python.proxies.$$}. The {@code } + * part is increased by one in runtime, for every job submission. It results in different serial + * version UID for each run for the same Python class. Therefore, it is required to silently + * suppress the serial version UID mismatch check. + */ +public class PythonObjectInputStream2 extends PythonObjectInputStream { --- End diff -- How would you suggest to call it? It extends the `PythonObjectInputStream`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3826: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi closed the pull request at: https://github.com/apache/flink/pull/3826 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
GitHub user zohar-mizrahi opened a pull request: https://github.com/apache/flink/pull/3838 [FLINK-5886] Python API for streaming applications Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zohar-mizrahi/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3838 commit c1333c3424897caa615683d3494b41e7ab88d45d Author: Zohar Mizrahi Date: 2016-11-15T12:46:36Z [FLINK-5886] Python API for streaming applications --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3827: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi closed the pull request at: https://github.com/apache/flink/pull/3827 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996944#comment-15996944 ] Zohar Mizrahi commented on FLINK-5886: -- No problem - I'll rebase on top of master. As for #3826 - maybe you referred to another ticket, because I'm not familiar with this one. > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3827: [FLINK-5886] Python API for streaming applications
GitHub user zohar-mizrahi opened a pull request: https://github.com/apache/flink/pull/3827 [FLINK-5886] Python API for streaming applications Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zohar-mizrahi/flink python-streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3827.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3827 commit 03684e8e460143013babb2ec88c66c8fa1119c43 Author: Zohar Mizrahi Date: 2017-04-09T09:11:57Z [FLINK-6177] Add support for "Distributed Cache" in streaming applications commit 7e6374f13a1846b6982923083ee98140c37d5903 Author: Zohar Mizrahi Date: 2017-04-20T08:19:38Z [FLINK-6177] Apply suggested fixes from a review commit 8606fafff92bbcea48c64a6accf87ec2b6802b46 Author: Zohar Mizrahi Date: 2017-04-20T16:38:49Z [FLINK-6177] Combine the streaming & batch distributed cache tests and inherit from StreamingMultipleProgramsTestBase commit fa74b64d100e51f4a8776b613714581c585b1ddd Author: Zohar Mizrahi Date: 2017-05-01T13:07:14Z [FLINK-6177] Add missing file cache functions to the scala StreamExecutionEnvironment commit cbbd86b59a4a5561fd383a704bc95c5c1c255449 Author: Zohar Mizrahi Date: 2016-11-15T12:46:36Z [FLINK-5886] Python API for streaming applications --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3826: [FLINK-5886] Python API for streaming applications
GitHub user zohar-mizrahi opened a pull request: https://github.com/apache/flink/pull/3826 [FLINK-5886] Python API for streaming applications Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zohar-mizrahi/flink python-streaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3826.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3826 commit 7d1b9f0bb2dea719dc438cb8649f200cc6235980 Author: Robert Metzger Date: 2017-01-03T09:05:54Z [FLINK-4861][hotfix] Fix change-scala-version script for opt assembly commit a6a5b21ef8c7fdd7d073296208f47d47ca6a Author: Sachin Date: 2017-01-03T10:28:11Z [FLINK-5382][web-frontend] Fix problems with downloading TM logs on Yarn commit 335175e6eefc260cf1600544639594d85836f7d8 Author: Ivan Mushketyk Date: 2016-12-16T07:56:46Z [FLINK-5349] [docs] Fix typos in Twitter connector example This closes #3015. commit bb46fffe310f9cd6f667293df14d98e90011d591 Author: Abhishek R. Singh Date: 2016-12-14T14:05:11Z [FLINK-5323] [docs] Replace CheckpointNotifier with CheckpointListener THis closes #3006. commit 24109cb2692f1f0dd2b9f8c9c8dcc02e55148bab Author: zentol Date: 2016-11-25T12:27:43Z [FLINK-4870] Fix path handling in ContinuousFileMonitoringFunction This closes #2887. commit b50bbcc8853c1c2ebcdba9c74a70bfdfbe6557ab Author: Boris Osipov Date: 2016-12-16T07:30:33Z [FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape This closes #3019. commit 9c0c19aae5b78de71c91a735a76cd9196dc8482c Author: zentol Date: 2016-11-25T11:51:38Z [FLINK-5160] Fix SecurityContextTest#testCreateInsecureHadoopContext on Windows This closes #2888. commit 91f9a1acaa899159a0d907528634bd246e6854b4 Author: Stephan Ewen Date: 2017-01-04T23:18:13Z [FLINK-5408] [RocksDB backend] Uniquify RocksDB JNI library path to avoid multiple classloader problem commit fb48c3b4cbc5a186cb7b812c8d05833c5852b385 Author: Stephan Ewen Date: 2017-01-05T13:44:00Z [FLINK-4890] [core] Make GlobFilePathFilter work on Windows commit 3554c96d118a411906a22b1f1087de073617a4c7 Author: Stefan Richter Date: 2016-12-28T11:50:00Z [FLINK-5397] [runtime] Do not replace ObjectStreamClass on deserialization of migration package classes, override resolveClass(...) instead This closes #3050 commit 700cbd464345e9c180cfef58a4082b2e39d27160 Author: Stephan Ewen Date: 2017-01-05T16:30:37Z [hotfix] Set default test logger back to 'OFF' in 'flink-tests' commit 65a32e74175c026f04ab058ee2ace8c9e7012d76 Author: zentol Date: 2017-01-05T14:39:25Z [FLINK-5160] SecurityUtils use OperatingSystem.getCurrentOperatingSystem() commit aaf9612791284633727a0951d0d45292ef5e233c Author: zentol Date: 2017-01-05T15:18:38Z [FLINK-5412] Enable RocksDB tests on Windows OS commit 9d99f2bd4a29b748905e55a774ff04f933b6b00f Author: Alexey Diomin Date: 2016-07-04T15:13:11Z [FLINK-4148] Fix min distance calculation in QuadTree commit b93f80afc7c50c7fefc850b620bb571523343595 Author: Sachin Goel Date: 2017-01-05T18:00:26Z [FLINK-5119][web-frontend] Fix problems in displaying TM heartbeat and path. commit 7251a4ca9539200ba4c894c116800fdc7cecca9f Author: Sachin Goel Date: 2017-01-05T16:55:58Z [FLINK-5381][web-frontend] Fix scrolling issues commit 7ff59fe7bfde0f537db0cefe0f94225c8a8d807b Author: zentol Date: 2017-01-02T14:29:56Z [hotfix] RAT - Exclude test snapshots commit 2aaa093461fbe8f4b1c4bb8da4be593c3ac7c2c0 Author: Shannon Carey Date: 2017-01-04T19:20:12Z [docs] Clarify restart strategy defaults set
[jira] [Closed] (FLINK-6283) Enable to clear a given file cache path
[ https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zohar Mizrahi closed FLINK-6283. Resolution: Won't Do > Enable to clear a given file cache path > > > Key: FLINK-6283 > URL: https://issues.apache.org/jira/browse/FLINK-6283 > Project: Flink > Issue Type: Improvement > Reporter: Zohar Mizrahi > Assignee: Zohar Mizrahi >Priority: Minor > > In the context of the FileCache functionality that is used within the > distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path
[ https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972776#comment-15972776 ] Zohar Mizrahi commented on FLINK-6283: -- Yes, this is how I implemented it. In addition I added a conditional check whether the path exists (as far as I remember to avoid an exception) But as you mentioned, nothing fancy here. I'll skip this change. > Enable to clear a given file cache path > > > Key: FLINK-6283 > URL: https://issues.apache.org/jira/browse/FLINK-6283 > Project: Flink > Issue Type: Improvement > Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Minor > > In the context of the FileCache functionality that is used within the > distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path
[ https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972673#comment-15972673 ] Zohar Mizrahi commented on FLINK-6283: -- Yep, FileCache#copy and therefore, I also added the FileCache#clearPath. Assuming the random number generator is strong enough, a given temporary folder would not repeat itself (so, I may skip this change). Actually, I have to make sure to cleanup the given temporary folder once the execution is completed. We don't really want to consume the storage/memory (depends how the /tmp folder is mounted) over time. > Enable to clear a given file cache path > > > Key: FLINK-6283 > URL: https://issues.apache.org/jira/browse/FLINK-6283 > Project: Flink > Issue Type: Improvement > Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Minor > > In the context of the FileCache functionality that is used within the > distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path
[ https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972635#comment-15972635 ] Zohar Mizrahi commented on FLINK-6283: -- In one case - when the python files are prepared for client side execution. A temporary folder is created (though, concatenated with random number) locally, without the job scope. All the python files (along with packages), are then copied to that temporary folder, for local execution. > Enable to clear a given file cache path > > > Key: FLINK-6283 > URL: https://issues.apache.org/jira/browse/FLINK-6283 > Project: Flink > Issue Type: Improvement > Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Minor > > In the context of the FileCache functionality that is used within the > distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6283) Enable to clear a given file cache path
[ https://issues.apache.org/jira/browse/FLINK-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972346#comment-15972346 ] Zohar Mizrahi commented on FLINK-6283: -- It refers to a file cache created by a python streaming application. For that purpose, it uses {{org.apache.flink.runtime.filecache.FileCache}}. > Enable to clear a given file cache path > > > Key: FLINK-6283 > URL: https://issues.apache.org/jira/browse/FLINK-6283 > Project: Flink > Issue Type: Improvement > Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Minor > > In the context of the FileCache functionality that is used within the > distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6283) Enable to clear a given file cache path
Zohar Mizrahi created FLINK-6283: Summary: Enable to clear a given file cache path Key: FLINK-6283 URL: https://issues.apache.org/jira/browse/FLINK-6283 Project: Flink Issue Type: Improvement Reporter: Zohar Mizrahi Assignee: Zohar Mizrahi Priority: Minor In the context of the FileCache functionality that is used within the distributed cache flow, add functionality to clear a given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6177) Add support for "Distributed Cache" in streaming applications
Zohar Mizrahi created FLINK-6177: Summary: Add support for "Distributed Cache" in streaming applications Key: FLINK-6177 URL: https://issues.apache.org/jira/browse/FLINK-6177 Project: Flink Issue Type: New Feature Components: DataStream API Reporter: Zohar Mizrahi Assignee: Zohar Mizrahi -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zohar Mizrahi updated FLINK-5886: - Description: A work in progress to provide python interface for Flink streaming APIs. The core technology is based on jython and thus imposes two limitations: a. user defined functions cannot use python extensions. b. the python version is 2.x The branch is based on Flink release 1.2.0, as can be found here: https://github.com/zohar-pm/flink/tree/python-streaming In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was setup properly (see: https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, which in return will execute all the tests under {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} was: A work in progress to provide python interface for Flink streaming APIs. The core technology is based on jython and thus imposes two limitations: a. user defined functions cannot use python extensions. b. the python version is 2.x The branch is based on Flink release 1.2.0, as can be found here: https://github.com/zohar-pm/flink/tree/python-streaming In order to test it, someone can run `flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/jython/PythonStreamBinder.java` from IntelliJ IDE (assuming IntelliJ IDE is configured properly). > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API > Reporter: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5992) Enable file registration at distributed cache in stream execution environment
Zohar Mizrahi created FLINK-5992: Summary: Enable file registration at distributed cache in stream execution environment Key: FLINK-5992 URL: https://issues.apache.org/jira/browse/FLINK-5992 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Zohar Mizrahi Create new API in stream execution environment to enable file registration at the distributed cache. This file will be accessible from any user-defined function in the (distributed) runtime under the given path. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5886) Python API for streaming applications
Zohar Mizrahi created FLINK-5886: Summary: Python API for streaming applications Key: FLINK-5886 URL: https://issues.apache.org/jira/browse/FLINK-5886 Project: Flink Issue Type: New Feature Components: Python API Reporter: Zohar Mizrahi A work in progress to provide python interface for Flink streaming APIs. The core technology is based on jython and thus imposes two limitations: a. user defined functions cannot use python extensions. b. the python version is 2.x The branch is based on Flink release 1.2.0, as can be found here: https://github.com/zohar-pm/flink/tree/python-streaming In order to test it, someone can run `flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/jython/PythonStreamBinder.java` from IntelliJ IDE (assuming IntelliJ IDE is configured properly). -- This message was sent by Atlassian JIRA (v6.3.15#6346)