markap14 commented on code in PR #7003:
URL: https://github.com/apache/nifi/pull/7003#discussion_r1163223551


##########
nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/py4j/PythonProcess.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.py4j;
+
+import org.apache.nifi.py4j.client.JavaObjectBindings;
+import org.apache.nifi.py4j.client.NiFiPythonGateway;
+import org.apache.nifi.py4j.client.StandardPythonClient;
+import org.apache.nifi.py4j.server.NiFiGatewayServer;
+import org.apache.nifi.python.ControllerServiceTypeLookup;
+import org.apache.nifi.python.PythonController;
+import org.apache.nifi.python.PythonProcessConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import py4j.CallbackClient;
+import py4j.GatewayServer;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+// TODO / Figure Out for MVP:
+//      MUST DO:
+//      - Documentation
+//          - Admin Guide
+//          - JavaDocs
+//          - Developer Guide
+//              - Explain how communication between Java & Python work.
+//              - Java is preferred, Python is slower and more expensive b/c 
of network
+//              - Different Extension Points (FlowFileTransform, 
RecordTransform)
+//                  - What the API Looks like, Links to JavaDocs for 
ProcessContext, etc.
+//                  - Example Code
+//              - Exposing properties
+//              - Relationships
+//              - Controller Services
+//                  - Need to update docs to show the interfaces that are 
exposed, explain how to get these...
+//          - Design Doc
+//      - Setup proper logging on the Python side: 
https://docs.python.org/2/howto/logging-cookbook.html#using-file-rotation
+//      - For FlowFileTransform, allow the result to contain either a byte 
array or a String. If a String, just convert in the parent class.
+//      - Figure out how to deal with Python Packaging
+//              - Need to figure out how to deal with additionalDetails.html, 
docs directory in python project typically?
+//              - Understand how to deal with versioning
+//      - Look at performance improvements for Py4J - socket comms appear to 
be INCREDIBLY slow.
+//              - Create test that calls Python 1M times. Just returns 
'hello'. See how long it takes
+//              - Create test that calls Python 1M times. Returns <java 
object>.toString() and see how long it takes.
+//              - Will help to understand if it's the call from Java to Python 
that's slow, Python to Java, or both.
+//      - Performance concern for TransformRecord
+//              - Currently, triggering the transform() method is pretty fast. 
But then the Result object comes back and we have to call into the Python side 
to call the getters
+//                over and over. Need to look into instead serializing the 
entire response as JSON and sending that back.
+//              - Also, since this is heavy JSON processing, might want to 
consider ORJSON or something like that instead of inbuilt JSON parser/generator
+//      - Test pip install nifi-my-proc, does nifi pick it up?
+//      - When ran DetectObjectInImage with multiple threads, Python died. 
Need to figure out why.
+//      - If Python Process dies, need to create a new process and need to 
then create all of the Processors that were in that Process and initialize them.
+//            - Milestone 2 or 3, not Milestone 1.
+//      - Remove test-pypi usage from ExtensionManager.py
+//      - Additional Interfaces beyond just FlowFileTransform
+//          - FlowFileSource
+//      - Restructure Maven projects
+//          - Should this all go under Framework?
+//
+//
+//      CONSIDER:
+//      - Clustering: Ensure component on all nodes?
+//          - Consider "pip freeze" type of thing to ensure that python 
dependencies are same across nodes when joining cluster.
+//      - Update python code to use python_style_method_names instead of 
javaStyleMethodNames
+//      - Also add 'failure' and 'original' relationships to FlowFileTransform
+//
+//
+//      Can punt for now:
+//      - We have an issue with objects created from Processor calling into 
Java. Is fine when we provide objects to Python but when
+//          it makes a callback, those objects are bound and never unbound!!!
+//              *** This appears to be fine as long as on the Python side we 
set manage_memory=True ***
+//                  But it does cut perf in half. May be room for improvement 
somehow? By implementing a 'bulk delete' custom command?
+//      - Appears to still be issues with timeout in nifi when set to 10 
secs... initially will probably just leave default of 0sec.
+
+public class PythonProcess {
+    private static final Logger logger = 
LoggerFactory.getLogger(PythonProcess.class);
+    private static final String PYTHON_CONTROLLER_FILENAME = "Controller.py";
+
+    private final PythonProcessConfig processConfig;
+    private final ControllerServiceTypeLookup controllerServiceTypeLookup;
+    private final File virtualEnvHome;
+    private GatewayServer server;
+    private PythonController controller;
+    private Process process;
+    private NiFiPythonGateway gateway;
+    private final Map<String, Boolean> processorPrefersIsolation = new 
ConcurrentHashMap<>();
+
+
+    public PythonProcess(final PythonProcessConfig processConfig, final 
ControllerServiceTypeLookup controllerServiceTypeLookup, final File 
virtualEnvHome) {
+        this.processConfig = processConfig;
+        this.controllerServiceTypeLookup = controllerServiceTypeLookup;
+        this.virtualEnvHome = virtualEnvHome;
+    }
+
+    public PythonController getController() {
+        return controller;
+    }
+
+    public void start() throws IOException {
+        // TODO: Look into using configured TLS Certs to make this secure by 
default.
+        final ServerSocketFactory serverSocketFactory = 
ServerSocketFactory.getDefault();
+        final SocketFactory socketFactory = SocketFactory.getDefault();
+
+        final int timeoutMillis = (int) 
processConfig.getCommsTimeout().toMillis();
+        final String authToken = null;
+        final CallbackClient callbackClient = new 
CallbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(), authToken,
+            50000L, TimeUnit.MILLISECONDS, socketFactory, false, 
timeoutMillis);
+
+        final JavaObjectBindings bindings = new JavaObjectBindings();
+        gateway = new NiFiPythonGateway(bindings, null, callbackClient);
+        gateway.startup();
+
+        server = new NiFiGatewayServer(gateway,
+            0,
+            GatewayServer.defaultAddress(),
+            timeoutMillis,
+            timeoutMillis,
+            Collections.emptyList(),
+            serverSocketFactory);
+        server.start();
+
+        final int listeningPort = server.getListeningPort();
+
+        setupEnvironment();
+        this.process = launchPythonProcess(listeningPort);
+
+        final StandardPythonClient pythonClient = new 
StandardPythonClient(gateway);
+        controller = pythonClient.getController();
+
+        final long timeout = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(60L);
+        Exception lastException = null;
+        boolean pingSuccessful = false;
+        while (System.currentTimeMillis() < timeout) {
+            try {
+                final String pingResponse = controller.ping();
+                pingSuccessful = "pong".equals(pingResponse);

Review Comment:
   Yeah, I believe this can be clarified. I like the String over boolean 
because `boolean` can also be inferred from just about anything - null or not, 
0 vs 1, etc. and may not convey the intent as clearly. But I do think that we 
should document in the interface very clearly that it is expected to return the 
String value `pong`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to