dam4rus commented on code in PR #6838:
URL: https://github.com/apache/nifi/pull/6838#discussion_r1071866456


##########
nifi-docs/src/main/asciidoc/toolkit-guide.adoc:
##########
@@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to 
ZooKeeper 3.5.x, the migrator
 * For a ZooKeeper using Kerberos for authentication:
 ** `zk-migrator.sh -s -z 
destinationHostname:destinationClientPort/destinationRootPath/components -k 
/path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
 
-6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
\ No newline at end of file
+6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
+
+[[kafka_migrator]]
+== Kafka Processor Migrator
+With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 
were removed.
+In large flows having many numbers of components it is challenging to replace 
these processors manually.
+This tool can be used to update a flow in an automated way.
+
+=== Usage
+Running the script requires 3 mandatory and 1 optional parameters:
+
+* Input file, the full path of the flow.xml.gz in which the replacement is 
required.
+* Output file, the full path of the file where the results should be saved.
+* Transaction, whether the new processors should be configured with or without 
transaction usage.
+* Kafka Brokers, a comma separated list of Kafka Brokers in <host>:<port> 
format.
+
+Different input and output files must be used.
+Kafka Broker argument can be omitted if flow does not contain GetKafka or 
PutKafka processors.
+
+1. Run script, a possible example:
+
+ ./bin/kafka-migrator.sh "/tmp/flow/flow.xml.gz" 
"/tmp/flow/flow_result.xml.gz" false "mykafkaserver1:1234,mykafkaserver2:1235"
+
+2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input 
file.
+3. Copy flow.xml.gz file to all the NiFi nodes conf directory
+4. Start NiFi
+5. Verify the results.
+
+=== Expected Behaviour
+* Flow replacement:
+* For all replaced processors:
+** changing class and artifact
+** configure transaction as true
+*** 'Delivery Guarantee' property will be set to 'Replicated'
+*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in 
the file they will be set to true
+*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be 
translated as true in NiFi
+** configure transaction as false
+*** 'Delivery Guarantee' property will keep its original setting.
+*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
+* For version 8.0 processors (when kafka broker list argument provided)

Review Comment:
   Didn't you mean version 0.8 processor? Or version 8.0 of what?



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService;
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
+import org.w3c.dom.Document;
+
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class KafkaMigratorMain {
+
+    private static void printUsage() {
+        System.out.println("This application replaces Kafka processors from 
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
+                " in a flow.xml.gz file.");
+        System.out.println("\n");
+        System.out.println("Usage: kafka-migrator.sh <path to input 
flow.xml.gz> <path to output flow.xml.gz>" +
+                " <use transaction true or false>\n<optional: coma separated 
kafka brokers in <host>:<port> format. " +

Review Comment:
   ```suggestion
                   " <use transaction true or false>\n<optional: comma 
separated kafka brokers in <host>:<port> format. " +
   ```



##########
nifi-docs/src/main/asciidoc/toolkit-guide.adoc:
##########
@@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to 
ZooKeeper 3.5.x, the migrator
 * For a ZooKeeper using Kerberos for authentication:
 ** `zk-migrator.sh -s -z 
destinationHostname:destinationClientPort/destinationRootPath/components -k 
/path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
 
-6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
\ No newline at end of file
+6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
+
+[[kafka_migrator]]
+== Kafka Processor Migrator
+With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 
were removed.
+In large flows having many numbers of components it is challenging to replace 
these processors manually.
+This tool can be used to update a flow in an automated way.
+
+=== Usage
+Running the script requires 3 mandatory and 1 optional parameters:

Review Comment:
   I get that the script only accepts an argument list and no options, so "it's 
obvious" that the Kafka Brokers are the optional parameter but I would still 
mark it as optional in the documentation



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService;
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
+import org.w3c.dom.Document;
+
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class KafkaMigratorMain {
+
+    private static void printUsage() {
+        System.out.println("This application replaces Kafka processors from 
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
+                " in a flow.xml.gz file.");
+        System.out.println("\n");
+        System.out.println("Usage: kafka-migrator.sh <path to input 
flow.xml.gz> <path to output flow.xml.gz>" +
+                " <use transaction true or false>\n<optional: coma separated 
kafka brokers in <host>:<port> format. " +
+                "Required for version 0.8 processors only>");
+    }
+
+    public static void main(final String[] args) throws Exception {

Review Comment:
   I'm kinda worried about using only arguments (arg1 argN...) instead of 
options (--input-file <file> --output-file <file>). Is there any chance this 
script will require additional required arguments in the future? It would be 
really weird to introduce a new required argument since the optional arguments 
index should change or make them into required. It's "fine" if we are 100% sure 
that this script will never change but I would still take this into 
consideration



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.util.Map;
+
+public class KafkaMigrationService {

Review Comment:
   I think this class hierarchy is kind of confusing for multiple reasons. This 
class feels like it should be an interface with these methods:
   
   * `String getPath();`
   * `String getPathForClass();`
   * `Migrator createPublishMigrator(final Map<String, String> arguments);`
   * `Migrator createConsumeMigrator(final Map<String, String> arguments);`
   * `Migrator createVersionEightPublishMigrator(final Map<String, String> 
arguments);`
   * `Migrator createVersionEightConsumeMigrator(final Map<String, String> 
arguments);`
    
   Then remove `void replaceKafkaProcessors` and make `void replaceProcessors` 
a `default` method. This way you can get rid of some weird patterns, like 
creating a derived instance in a base class method, the need to override 
`replaceKafkaProcessors` only to call `replaceProcessors`, returning `null` in 
`createXMigrator` methods, etc.



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+import java.util.Map;
+
+public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
+    private static final String XPATH_FOR_TRANSACTION_PROPERTY = 
"property[name=\"honor-transactions\"]/value";
+    private static final String TRANSACTION_TAG_NAME = "honor-transactions";
+
+    public ConsumeKafkaFlowMigrator(final Map<String, String> arguments, final 
boolean isVersion8Processor) {
+        super(arguments, isVersion8Processor,
+                new KafkaProcessorDescriptor("Consume"),
+                "property",
+                "name", "property",
+                XPATH_FOR_TRANSACTION_PROPERTY, TRANSACTION_TAG_NAME);
+    }
+
+    @Override
+    public void configureProperties(final Node node) throws 
XPathExpressionException {

Review Comment:
   Is there a reason you explicitly override this method? If you only call the 
super method you can omit this override



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+import java.util.Map;
+
+public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
+    private static final String XPATH_FOR_TRANSACTION_PROPERTY = 
"property[name=\"honor-transactions\"]/value";
+    private static final String TRANSACTION_TAG_NAME = "honor-transactions";
+
+    public ConsumeKafkaFlowMigrator(final Map<String, String> arguments, final 
boolean isVersion8Processor) {
+        super(arguments, isVersion8Processor,
+                new KafkaProcessorDescriptor("Consume"),
+                "property",

Review Comment:
   `new KafkaProcessorDescriptor("Consume"), "property", "name", "property"` 
arguments are duplicated in ConsumeKafkaTemplateFlowMigrator. Please consider 
either extracting them to constants or change the signature of the base 
constructor to accept some kind of "descriptor" class so you can nicely wrap 
them into a constant instance.



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService;
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
+import org.w3c.dom.Document;
+
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class KafkaMigratorMain {
+
+    private static void printUsage() {
+        System.out.println("This application replaces Kafka processors from 
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
+                " in a flow.xml.gz file.");
+        System.out.println("\n");
+        System.out.println("Usage: kafka-migrator.sh <path to input 
flow.xml.gz> <path to output flow.xml.gz>" +
+                " <use transaction true or false>\n<optional: coma separated 
kafka brokers in <host>:<port> format. " +
+                "Required for version 0.8 processors only>");
+    }
+
+    public static void main(final String[] args) throws Exception {
+        if (helpRequested(args)) {
+            printUsage();
+            return;
+        }
+
+        final String input = args[0];
+        final String output = args[1];
+        if (input.equalsIgnoreCase(output)) {
+            System.out.println("Input and output files should be different.");
+            return;
+        }
+        final String transaction = args[2];
+        if (!(transaction.equalsIgnoreCase("true") || 
transaction.equalsIgnoreCase("false"))) {
+            System.out.println("Transaction argument should be either true or 
false.");
+            return;
+        }
+        String kafkaBrokers = "";
+        if (args.length == 4) {
+            if (args[3].matches(".+:\\d+")) {
+                kafkaBrokers = args[3];
+            } else {
+                System.out.println("Kafka Brokers must be in a <host>:<port> 
format, can be separated by coma. " +
+                        "For example: hostname:1234, host:5678");
+                return;
+            }
+        }
+
+        final Map<String, String> arguments = new HashMap<>();

Review Comment:
   I think you should rather create a class that wraps the arguments. It would 
make things more obvious down the line.



##########
nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.util.Map;
+
+public class KafkaMigrationService {
+
+    private final static String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES = 
"(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?";
+    private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0";
+    private final static String ARTIFACT = "nifi-kafka-2-0-nar";
+    private final static String PATH_FOR_ARTIFACT = "bundle/artifact";
+    protected static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
+    protected static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR = 
Boolean.FALSE;
+
+    protected String path;
+    protected String pathForClass;
+
+    public void replaceKafkaProcessors(final Document document, final 
Map<String, String> arguments) throws XPathExpressionException {

Review Comment:
   I would move the body of this method to an "unrelated" class or just inline 
it at it's call site.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to