dam4rus commented on code in PR #6838: URL: https://github.com/apache/nifi/pull/6838#discussion_r1071866456
########## nifi-docs/src/main/asciidoc/toolkit-guide.adoc: ########## @@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to ZooKeeper 3.5.x, the migrator * For a ZooKeeper using Kerberos for authentication: ** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json` -6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. \ No newline at end of file +6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. + +[[kafka_migrator]] +== Kafka Processor Migrator +With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 were removed. +In large flows having many numbers of components it is challenging to replace these processors manually. +This tool can be used to update a flow in an automated way. + +=== Usage +Running the script requires 3 mandatory and 1 optional parameters: + +* Input file, the full path of the flow.xml.gz in which the replacement is required. +* Output file, the full path of the file where the results should be saved. +* Transaction, whether the new processors should be configured with or without transaction usage. +* Kafka Brokers, a comma separated list of Kafka Brokers in <host>:<port> format. + +Different input and output files must be used. +Kafka Broker argument can be omitted if flow does not contain GetKafka or PutKafka processors. + +1. Run script, a possible example: + + ./bin/kafka-migrator.sh "/tmp/flow/flow.xml.gz" "/tmp/flow/flow_result.xml.gz" false "mykafkaserver1:1234,mykafkaserver2:1235" + +2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input file. +3. Copy flow.xml.gz file to all the NiFi nodes conf directory +4. Start NiFi +5. Verify the results. + +=== Expected Behaviour +* Flow replacement: +* For all replaced processors: +** changing class and artifact +** configure transaction as true +*** 'Delivery Guarantee' property will be set to 'Replicated' +*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in the file they will be set to true +*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be translated as true in NiFi +** configure transaction as false +*** 'Delivery Guarantee' property will keep its original setting. +*** 'Honor-Transactions' and 'Use-Transactions' will be set to false +* For version 8.0 processors (when kafka broker list argument provided) Review Comment: Didn't you mean version 0.8 processor? Or version 8.0 of what? ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService; +import org.apache.nifi.xml.processing.parsers.DocumentProvider; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.apache.nifi.xml.processing.transform.StandardTransformProvider; +import org.w3c.dom.Document; + +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class KafkaMigratorMain { + + private static void printUsage() { + System.out.println("This application replaces Kafka processors from version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" + + " in a flow.xml.gz file."); + System.out.println("\n"); + System.out.println("Usage: kafka-migrator.sh <path to input flow.xml.gz> <path to output flow.xml.gz>" + + " <use transaction true or false>\n<optional: coma separated kafka brokers in <host>:<port> format. " + Review Comment: ```suggestion " <use transaction true or false>\n<optional: comma separated kafka brokers in <host>:<port> format. " + ``` ########## nifi-docs/src/main/asciidoc/toolkit-guide.adoc: ########## @@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to ZooKeeper 3.5.x, the migrator * For a ZooKeeper using Kerberos for authentication: ** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json` -6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. \ No newline at end of file +6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. + +[[kafka_migrator]] +== Kafka Processor Migrator +With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 were removed. +In large flows having many numbers of components it is challenging to replace these processors manually. +This tool can be used to update a flow in an automated way. + +=== Usage +Running the script requires 3 mandatory and 1 optional parameters: Review Comment: I get that the script only accepts an argument list and no options, so "it's obvious" that the Kafka Brokers are the optional parameter but I would still mark it as optional in the documentation ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService; +import org.apache.nifi.xml.processing.parsers.DocumentProvider; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.apache.nifi.xml.processing.transform.StandardTransformProvider; +import org.w3c.dom.Document; + +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class KafkaMigratorMain { + + private static void printUsage() { + System.out.println("This application replaces Kafka processors from version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" + + " in a flow.xml.gz file."); + System.out.println("\n"); + System.out.println("Usage: kafka-migrator.sh <path to input flow.xml.gz> <path to output flow.xml.gz>" + + " <use transaction true or false>\n<optional: coma separated kafka brokers in <host>:<port> format. " + + "Required for version 0.8 processors only>"); + } + + public static void main(final String[] args) throws Exception { Review Comment: I'm kinda worried about using only arguments (arg1 argN...) instead of options (--input-file <file> --output-file <file>). Is there any chance this script will require additional required arguments in the future? It would be really weird to introduce a new required argument since the optional arguments index should change or make them into required. It's "fine" if we are 100% sure that this script will never change but I would still take this into consideration ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.service; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.util.Map; + +public class KafkaMigrationService { Review Comment: I think this class hierarchy is kind of confusing for multiple reasons. This class feels like it should be an interface with these methods: * `String getPath();` * `String getPathForClass();` * `Migrator createPublishMigrator(final Map<String, String> arguments);` * `Migrator createConsumeMigrator(final Map<String, String> arguments);` * `Migrator createVersionEightPublishMigrator(final Map<String, String> arguments);` * `Migrator createVersionEightConsumeMigrator(final Map<String, String> arguments);` Then remove `void replaceKafkaProcessors` and make `void replaceProcessors` a `default` method. This way you can get rid of some weird patterns, like creating a derived instance in a base class method, the need to override `replaceKafkaProcessors` only to call `replaceProcessors`, returning `null` in `createXMigrator` methods, etc. ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathExpressionException; +import java.util.Map; + +public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator { + private static final String XPATH_FOR_TRANSACTION_PROPERTY = "property[name=\"honor-transactions\"]/value"; + private static final String TRANSACTION_TAG_NAME = "honor-transactions"; + + public ConsumeKafkaFlowMigrator(final Map<String, String> arguments, final boolean isVersion8Processor) { + super(arguments, isVersion8Processor, + new KafkaProcessorDescriptor("Consume"), + "property", + "name", "property", + XPATH_FOR_TRANSACTION_PROPERTY, TRANSACTION_TAG_NAME); + } + + @Override + public void configureProperties(final Node node) throws XPathExpressionException { Review Comment: Is there a reason you explicitly override this method? If you only call the super method you can omit this override ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathExpressionException; +import java.util.Map; + +public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator { + private static final String XPATH_FOR_TRANSACTION_PROPERTY = "property[name=\"honor-transactions\"]/value"; + private static final String TRANSACTION_TAG_NAME = "honor-transactions"; + + public ConsumeKafkaFlowMigrator(final Map<String, String> arguments, final boolean isVersion8Processor) { + super(arguments, isVersion8Processor, + new KafkaProcessorDescriptor("Consume"), + "property", Review Comment: `new KafkaProcessorDescriptor("Consume"), "property", "name", "property"` arguments are duplicated in ConsumeKafkaTemplateFlowMigrator. Please consider either extracting them to constants or change the signature of the base constructor to accept some kind of "descriptor" class so you can nicely wrap them into a constant instance. ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaMigrationService; +import org.apache.nifi.xml.processing.parsers.DocumentProvider; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.apache.nifi.xml.processing.transform.StandardTransformProvider; +import org.w3c.dom.Document; + +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class KafkaMigratorMain { + + private static void printUsage() { + System.out.println("This application replaces Kafka processors from version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" + + " in a flow.xml.gz file."); + System.out.println("\n"); + System.out.println("Usage: kafka-migrator.sh <path to input flow.xml.gz> <path to output flow.xml.gz>" + + " <use transaction true or false>\n<optional: coma separated kafka brokers in <host>:<port> format. " + + "Required for version 0.8 processors only>"); + } + + public static void main(final String[] args) throws Exception { + if (helpRequested(args)) { + printUsage(); + return; + } + + final String input = args[0]; + final String output = args[1]; + if (input.equalsIgnoreCase(output)) { + System.out.println("Input and output files should be different."); + return; + } + final String transaction = args[2]; + if (!(transaction.equalsIgnoreCase("true") || transaction.equalsIgnoreCase("false"))) { + System.out.println("Transaction argument should be either true or false."); + return; + } + String kafkaBrokers = ""; + if (args.length == 4) { + if (args[3].matches(".+:\\d+")) { + kafkaBrokers = args[3]; + } else { + System.out.println("Kafka Brokers must be in a <host>:<port> format, can be separated by coma. " + + "For example: hostname:1234, host:5678"); + return; + } + } + + final Map<String, String> arguments = new HashMap<>(); Review Comment: I think you should rather create a class that wraps the arguments. It would make things more obvious down the line. ########## nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.service; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.util.Map; + +public class KafkaMigrationService { + + private final static String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES = "(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?"; + private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0"; + private final static String ARTIFACT = "nifi-kafka-2-0-nar"; + private final static String PATH_FOR_ARTIFACT = "bundle/artifact"; + protected static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE; + protected static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE; + + protected String path; + protected String pathForClass; + + public void replaceKafkaProcessors(final Document document, final Map<String, String> arguments) throws XPathExpressionException { Review Comment: I would move the body of this method to an "unrelated" class or just inline it at it's call site. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org