markap14 commented on code in PR #7003: URL: https://github.com/apache/nifi/pull/7003#discussion_r1163223551
########## nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java: ########## @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.py4j; + +import org.apache.nifi.py4j.client.JavaObjectBindings; +import org.apache.nifi.py4j.client.NiFiPythonGateway; +import org.apache.nifi.py4j.client.StandardPythonClient; +import org.apache.nifi.py4j.server.NiFiGatewayServer; +import org.apache.nifi.python.ControllerServiceTypeLookup; +import org.apache.nifi.python.PythonController; +import org.apache.nifi.python.PythonProcessConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import py4j.CallbackClient; +import py4j.GatewayServer; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +// TODO / Figure Out for MVP: +// MUST DO: +// - Documentation +// - Admin Guide +// - JavaDocs +// - Developer Guide +// - Explain how communication between Java & Python work. +// - Java is preferred, Python is slower and more expensive b/c of network +// - Different Extension Points (FlowFileTransform, RecordTransform) +// - What the API Looks like, Links to JavaDocs for ProcessContext, etc. +// - Example Code +// - Exposing properties +// - Relationships +// - Controller Services +// - Need to update docs to show the interfaces that are exposed, explain how to get these... +// - Design Doc +// - Setup proper logging on the Python side: https://docs.python.org/2/howto/logging-cookbook.html#using-file-rotation +// - For FlowFileTransform, allow the result to contain either a byte array or a String. If a String, just convert in the parent class. +// - Figure out how to deal with Python Packaging +// - Need to figure out how to deal with additionalDetails.html, docs directory in python project typically? +// - Understand how to deal with versioning +// - Look at performance improvements for Py4J - socket comms appear to be INCREDIBLY slow. +// - Create test that calls Python 1M times. Just returns 'hello'. See how long it takes +// - Create test that calls Python 1M times. Returns <java object>.toString() and see how long it takes. +// - Will help to understand if it's the call from Java to Python that's slow, Python to Java, or both. +// - Performance concern for TransformRecord +// - Currently, triggering the transform() method is pretty fast. But then the Result object comes back and we have to call into the Python side to call the getters +// over and over. Need to look into instead serializing the entire response as JSON and sending that back. +// - Also, since this is heavy JSON processing, might want to consider ORJSON or something like that instead of inbuilt JSON parser/generator +// - Test pip install nifi-my-proc, does nifi pick it up? +// - When ran DetectObjectInImage with multiple threads, Python died. Need to figure out why. +// - If Python Process dies, need to create a new process and need to then create all of the Processors that were in that Process and initialize them. +// - Milestone 2 or 3, not Milestone 1. +// - Remove test-pypi usage from ExtensionManager.py +// - Additional Interfaces beyond just FlowFileTransform +// - FlowFileSource +// - Restructure Maven projects +// - Should this all go under Framework? +// +// +// CONSIDER: +// - Clustering: Ensure component on all nodes? +// - Consider "pip freeze" type of thing to ensure that python dependencies are same across nodes when joining cluster. +// - Update python code to use python_style_method_names instead of javaStyleMethodNames +// - Also add 'failure' and 'original' relationships to FlowFileTransform +// +// +// Can punt for now: +// - We have an issue with objects created from Processor calling into Java. Is fine when we provide objects to Python but when +// it makes a callback, those objects are bound and never unbound!!! +// *** This appears to be fine as long as on the Python side we set manage_memory=True *** +// But it does cut perf in half. May be room for improvement somehow? By implementing a 'bulk delete' custom command? +// - Appears to still be issues with timeout in nifi when set to 10 secs... initially will probably just leave default of 0sec. + +public class PythonProcess { + private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class); + private static final String PYTHON_CONTROLLER_FILENAME = "Controller.py"; + + private final PythonProcessConfig processConfig; + private final ControllerServiceTypeLookup controllerServiceTypeLookup; + private final File virtualEnvHome; + private GatewayServer server; + private PythonController controller; + private Process process; + private NiFiPythonGateway gateway; + private final Map<String, Boolean> processorPrefersIsolation = new ConcurrentHashMap<>(); + + + public PythonProcess(final PythonProcessConfig processConfig, final ControllerServiceTypeLookup controllerServiceTypeLookup, final File virtualEnvHome) { + this.processConfig = processConfig; + this.controllerServiceTypeLookup = controllerServiceTypeLookup; + this.virtualEnvHome = virtualEnvHome; + } + + public PythonController getController() { + return controller; + } + + public void start() throws IOException { + // TODO: Look into using configured TLS Certs to make this secure by default. + final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault(); + final SocketFactory socketFactory = SocketFactory.getDefault(); + + final int timeoutMillis = (int) processConfig.getCommsTimeout().toMillis(); + final String authToken = null; + final CallbackClient callbackClient = new CallbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), authToken, + 50000L, TimeUnit.MILLISECONDS, socketFactory, false, timeoutMillis); + + final JavaObjectBindings bindings = new JavaObjectBindings(); + gateway = new NiFiPythonGateway(bindings, null, callbackClient); + gateway.startup(); + + server = new NiFiGatewayServer(gateway, + 0, + GatewayServer.defaultAddress(), + timeoutMillis, + timeoutMillis, + Collections.emptyList(), + serverSocketFactory); + server.start(); + + final int listeningPort = server.getListeningPort(); + + setupEnvironment(); + this.process = launchPythonProcess(listeningPort); + + final StandardPythonClient pythonClient = new StandardPythonClient(gateway); + controller = pythonClient.getController(); + + final long timeout = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L); + Exception lastException = null; + boolean pingSuccessful = false; + while (System.currentTimeMillis() < timeout) { + try { + final String pingResponse = controller.ping(); + pingSuccessful = "pong".equals(pingResponse); Review Comment: Yeah, I believe this can be clarified. I like the String over boolean because `boolean` can also be inferred from just about anything - null or not, 0 vs 1, etc. and may not convey the intent as clearly. But I do think that we should document in the interface very clearly that it is expected to return the String value `pong`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org