This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 984c0a0baf NIFI-11044 Script/commands to migrate Kafka processors 984c0a0baf is described below commit 984c0a0baf0cac189c2e2f6d711978e5de651db1 Author: Timea Barna <timea.ba...@gmail.com> AuthorDate: Wed Mar 1 15:03:53 2023 +0100 NIFI-11044 Script/commands to migrate Kafka processors This closes #6998. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- nifi-docs/src/main/asciidoc/toolkit-guide.adoc | 69 ++++- nifi-toolkit/nifi-toolkit-assembly/pom.xml | 5 + .../src/main/resources/bin/kafka-migrator.bat | 41 +++ .../src/main/resources/bin/kafka-migrator.sh | 119 +++++++++ nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml | 51 ++++ .../toolkit/kafkamigrator/KafkaMigratorMain.java | 130 ++++++++++ .../kafkamigrator/MigratorConfiguration.java | 95 +++++++ .../descriptor/FlowPropertyXpathDescriptor.java | 69 +++++ .../descriptor/KafkaProcessorDescriptor.java | 128 ++++++++++ .../descriptor/KafkaProcessorType.java | 33 +++ .../descriptor/ProcessorDescriptor.java | 26 ++ .../descriptor/PropertyXpathDescriptor.java | 25 ++ .../TemplatePropertyXpathDescriptor.java | 69 +++++ .../migrator/AbstractKafkaMigrator.java | 193 ++++++++++++++ .../migrator/ConsumeKafkaFlowMigrator.java | 38 +++ .../migrator/ConsumeKafkaTemplateMigrator.java | 52 ++++ .../toolkit/kafkamigrator/migrator/Migrator.java | 29 +++ .../migrator/PublishKafkaFlowMigrator.java | 48 ++++ .../migrator/PublishKafkaTemplateMigrator.java | 57 +++++ .../service/KafkaFlowMigrationService.java | 76 ++++++ .../service/KafkaMigrationService.java | 72 ++++++ .../service/KafkaTemplateMigrationService.java | 75 ++++++ .../kafkamigrator/KafkaMigrationServiceTest.java | 155 ++++++++++++ .../toolkit/kafkamigrator/KafkaMigrationUtil.java | 32 +++ .../toolkit/kafkamigrator/KafkaMigratorTest.java | 278 +++++++++++++++++++++ .../src/test/resources/flow.xml | 136 ++++++++++ nifi-toolkit/pom.xml | 1 + 27 files changed, 2101 insertions(+), 1 deletion(-) diff --git a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc index 6e24472adf..4c05dd6980 100644 --- a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc +++ b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc @@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to ZooKeeper 3.5.x, the migrator * For a ZooKeeper using Kerberos for authentication: ** `zk-migrator.sh -s -z destinationHostname:destinationClientPort/destinationRootPath/components -k /path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json` -6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. \ No newline at end of file +6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. + +[[kafka_migrator]] +== Kafka Processor Migrator +With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 were removed. +In large flows having many numbers of components it is challenging to replace these processors manually. +This tool can be used to update a flow in an automated way. + +=== Usage +Running the script requires 3 mandatory and 1 optional parameters: + +* Input file, the full path of the flow.xml.gz in which the replacement is required. +* Output file, the full path of the file where the results should be saved. +* Transaction, whether the new processors should be configured with or without transaction usage. +* Optional: Kafka Brokers, a comma separated list of Kafka Brokers in <host>:<port> format. + +Different input and output files must be used. +Kafka Broker argument can be omitted if flow does not contain GetKafka or PutKafka processors. + +1. Run script, a possible example: + + ./bin/kafka-migrator.sh -i "/tmp/flow/flow.xml.gz" -o "/tmp/flow/flow_result.xml.gz" -t false -k "mykafkaserver1:1234,mykafkaserver2:1235" + +2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input file. +3. Copy flow.xml.gz file to all the NiFi nodes conf directory +4. Start NiFi +5. Verify the results. + +=== Expected Behaviour +* Flow replacement: +* For all replaced processors: +** changing class and artifact +** configure transaction as true +*** 'Delivery Guarantee' property will be set to 'Replicated' +*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in the file they will be set to true +*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be translated as true in NiFi +** configure transaction as false +*** 'Delivery Guarantee' property will keep its original setting. +*** 'Honor-Transactions' and 'Use-Transactions' will be set to false +* For version 0.8 processors (when kafka broker list argument provided) +** remove all version 0.8 properties +** add version 2.0 properties with default value except for 'Topic Name', 'Group ID', +'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and +'Compression Codec' values which will be copied over + +* Template replacement: +* For all replaced processors: +** changing type and artifact +** configure transaction as true +*** 'Delivery Guarantee' property will be set to 'Replicated' +*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in the file they will be set to true +*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be translated as true in NiFi +** configure transaction as false +*** 'Delivery Guarantee' property will keep its original setting. +*** 'Honor-Transactions' and 'Use-Transactions' will be set to false +* For version 0.8 processors (when kafka broker list argument provided) +** remove all version 0.8 properties and descriptors +** add version 2.0 properties with default value except for 'Topic Name', 'Group ID', +'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and +'Compression Codec' values which will be copied over +*** add version 2.0 descriptors + +=== Limitations +* All deprecated Kafka processors will be replaced with version 2.0 processors. +* Script will not rename the processors, only their type will be changed. +* Transaction setting will be applied to all the replaced processors. +* The flow.xml.gz file needs to be fit in memory and process time depends on the file size. +* Processors in templates will be replaced as well, please download the original templates if desired. \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml b/nifi-toolkit/nifi-toolkit-assembly/pom.xml index 107592fccd..a42276e55f 100644 --- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml +++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml @@ -93,6 +93,11 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-toolkit-flowanalyzer</artifactId> <version>1.21.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit-kafka-migrator</artifactId> + <version>1.21.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-toolkit-cli</artifactId> diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat new file mode 100644 index 0000000000..36fa74c722 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat @@ -0,0 +1,41 @@ +@echo off +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Use JAVA_HOME if it's set; otherwise, just use java + +if "%JAVA_HOME%" == "" goto noJavaHome +if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome +set JAVA_EXE=%JAVA_HOME%\bin\java.exe +goto startConfig + +:noJavaHome +echo The JAVA_HOME environment variable is not defined correctly. +echo Instead the PATH will be used to find the java executable. +echo. +set JAVA_EXE=java +goto startConfig + +:startConfig +set LIB_DIR=%~dp0..\classpath;%~dp0..\lib + +if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m + +SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS% org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain + +cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* "" + diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh new file mode 100644 index 0000000000..c4925ff87b --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh @@ -0,0 +1,119 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches + +SCRIPT_DIR=$(dirname "$0") +SCRIPT_NAME=$(basename "$0") +NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) +PROGNAME=$(basename "$0") + + +warn() { + (>&2 echo "${PROGNAME}: $*") +} + +die() { + warn "$*" + exit 1 +} + +detectOS() { + # OS specific support (must be 'true' or 'false'). + cygwin=false; + aix=false; + os400=false; + darwin=false; + case "$(uname)" in + CYGWIN*) + cygwin=true + ;; + AIX*) + aix=true + ;; + OS400*) + os400=true + ;; + Darwin) + darwin=true + ;; + esac + # For AIX, set an environment variable + if ${aix}; then + export LDR_CNTRL=MAXDATA=0xB0000000@DSA + echo ${LDR_CNTRL} + fi +} + +locateJava() { + # Setup the Java Virtual Machine + if $cygwin ; then + [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}") + [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}") + fi + + if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi + if [ "x${JAVA}" = "x" ]; then + if [ "x${JAVA_HOME}" != "x" ]; then + if [ ! -d "${JAVA_HOME}" ]; then + die "JAVA_HOME is not valid: ${JAVA_HOME}" + fi + JAVA="${JAVA_HOME}/bin/java" + else + warn "JAVA_HOME not set; results may vary" + JAVA=$(type java) + JAVA=$(expr "${JAVA}" : '.* \(/.*\)$') + if [ "x${JAVA}" = "x" ]; then + die "java command not found" + fi + fi + fi +} + +init() { + # Determine if there is special OS handling we must perform + detectOS + + # Locate the Java VM to execute + locateJava "$1" +} + +run() { + LIBS="${NIFI_TOOLKIT_HOME}/lib/*" + + sudo_cmd_prefix="" + if $cygwin; then + NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}") + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")" + else + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}" + fi + + export JAVA_HOME="$JAVA_HOME" + export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME" + + umask 0077 + exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m} org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain "$@" +} + + +init "$1" +run "$@" diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml new file mode 100644 index 0000000000..f9fbde5b8f --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit</artifactId> + <version>1.21.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-toolkit-kafka-migrator</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-xml-processing</artifactId> + <version>1.21.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/flow.xml</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java new file mode 100644 index 0000000000..29b26c94f8 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService; +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService; +import org.apache.nifi.xml.processing.parsers.DocumentProvider; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.apache.nifi.xml.processing.transform.StandardTransformProvider; +import org.w3c.dom.Document; + +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; + +public class KafkaMigratorMain { + + private static void printUsage() { + System.out.println("This application replaces Kafka processors from version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" + + " in a flow.xml.gz file."); + System.out.println("\n"); + System.out.println("Usage: kafka-migrator.sh -i <path to input flow.xml.gz> -o <path to output flow.xml.gz>" + + " -t <use transaction true or false>\noptional: -k <comma separated kafka brokers in <host>:<port> format. " + + "Required for version 0.8 processors only>"); + } + + public static void main(final String[] args) throws Exception { + if (showingUsageNeeded(args)) { + printUsage(); + return; + } + + String input = ""; + if (args[0].equalsIgnoreCase("-i")) { + input = args[1]; + } + + String output = ""; + if (args[2].equalsIgnoreCase("-o")) { + output = args[3]; + } + if (input.equalsIgnoreCase(output)) { + System.out.println("Input and output files should be different."); + return; + } + + String transaction = ""; + if (args[4].equalsIgnoreCase("-t")) { + transaction = args[5]; + } + + if (!(transaction.equalsIgnoreCase("true") || transaction.equalsIgnoreCase("false"))) { + System.out.println("Transaction argument should be either true or false."); + return; + } + + String kafkaBrokers = ""; + if (args.length == 8) { + if (args[6].equalsIgnoreCase("-k") && args[7].matches(".+:\\d+")) { + kafkaBrokers = args[7]; + } else { + System.out.println("Kafka Brokers must be in a <host>:<port> format, can be separated by comma. " + + "For example: hostname:1234, host:5678"); + return; + } + } + + final MigratorConfigurationBuilder configurationBuilder = new MigratorConfigurationBuilder(); + configurationBuilder.setKafkaBrokers(kafkaBrokers) + .setTransaction(Boolean.parseBoolean(transaction)); + + final InputStream fileStream = Files.newInputStream(Paths.get(input)); + final OutputStream outputStream = Files.newOutputStream(Paths.get(output)); + final InputStream gzipStream = new GZIPInputStream(fileStream); + final OutputStream gzipOutStream = new GZIPOutputStream(outputStream); + + System.out.println("Using flow=" + input); + + try { + final DocumentProvider documentProvider = new StandardDocumentProvider(); + final Document document = documentProvider.parse(gzipStream); + + final KafkaFlowMigrationService flowMigrationService = new KafkaFlowMigrationService(); + final KafkaTemplateMigrationService templateMigrationService = new KafkaTemplateMigrationService(); + + System.out.println("Replacing processors."); + flowMigrationService.replaceKafkaProcessors(document, configurationBuilder); + templateMigrationService.replaceKafkaProcessors(document, configurationBuilder); + + final StreamResult streamResult = new StreamResult(gzipOutStream); + final StandardTransformProvider transformProvider = new StandardTransformProvider(); + transformProvider.setIndent(true); + transformProvider.transform(new DOMSource(document), streamResult); + System.out.println("Replacing completed."); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Exception occurred while attempting to parse flow.xml.gz. Cause: " + e.getCause()); + } finally { + gzipOutStream.close(); + outputStream.close(); + gzipStream.close(); + fileStream.close(); + } + } + + private static boolean showingUsageNeeded(String[] args) { + return args.length < 6 || args[0].equalsIgnoreCase("-h") || args[0].equalsIgnoreCase("--help"); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java new file mode 100644 index 0000000000..3ffc33d0db --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.descriptor.ProcessorDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor; + +public class MigratorConfiguration { + final private String kafkaBrokers; + final private boolean transaction; + final private boolean isVersion8Processor; + final private ProcessorDescriptor processorDescriptor; + final private PropertyXpathDescriptor propertyXpathDescriptor; + + public MigratorConfiguration(final String kafkaBrokers, final boolean transaction, final boolean isVersion8Processor, + final ProcessorDescriptor processorDescriptor, final PropertyXpathDescriptor propertyXpathDescriptor) { + this.kafkaBrokers = kafkaBrokers; + this.transaction = transaction; + this.isVersion8Processor = isVersion8Processor; + this.processorDescriptor = processorDescriptor; + this.propertyXpathDescriptor = propertyXpathDescriptor; + } + + public String getKafkaBrokers() { + return kafkaBrokers; + } + + public boolean isTransaction() { + return transaction; + } + + public boolean isVersion8Processor() { + return isVersion8Processor; + } + + public ProcessorDescriptor getProcessorDescriptor() { + return processorDescriptor; + } + + public PropertyXpathDescriptor getPropertyXpathDescriptor() { + return propertyXpathDescriptor; + } + + public static class MigratorConfigurationBuilder { + private String kafkaBrokers; + private boolean transaction; + private boolean isVersion8Processor; + private ProcessorDescriptor processorDescriptor; + private PropertyXpathDescriptor propertyXpathDescriptor; + + public MigratorConfigurationBuilder setKafkaBrokers(final String kafkaBrokers) { + this.kafkaBrokers = kafkaBrokers; + return this; + } + + public MigratorConfigurationBuilder setTransaction(final boolean transaction) { + this.transaction = transaction; + return this; + } + + public MigratorConfigurationBuilder setIsVersion8Processor(final boolean isVersion8Processor) { + this.isVersion8Processor = isVersion8Processor; + return this; + } + + public MigratorConfigurationBuilder setProcessorDescriptor(final ProcessorDescriptor processorDescriptor) { + this.processorDescriptor = processorDescriptor; + return this; + } + + public MigratorConfigurationBuilder setPropertyXpathDescriptor(final PropertyXpathDescriptor propertyXpathDescriptor) { + this.propertyXpathDescriptor = propertyXpathDescriptor; + return this; + } + + public MigratorConfiguration build() { + return new MigratorConfiguration(kafkaBrokers, transaction, isVersion8Processor, + processorDescriptor, propertyXpathDescriptor); + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java new file mode 100644 index 0000000000..ad0aa453b0 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +import java.util.HashMap; +import java.util.Map; + +public class FlowPropertyXpathDescriptor implements PropertyXpathDescriptor { + + private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES; + private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES; + private static final Map<KafkaProcessorType, Map<String, String>> TRANSACTION_PROPERTIES; + static { + CONSUME_TRANSACTION_PROPERTIES = new HashMap<>(); + CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "property[name=\"honor-transactions\"]/value"); + CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", "honor-transactions"); + PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>(); + PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "property[name=\"use-transactions\"]/value"); + PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", "use-transactions"); + TRANSACTION_PROPERTIES = new HashMap<>(); + TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_TRANSACTION_PROPERTIES); + TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_TRANSACTION_PROPERTIES); + } + + private final KafkaProcessorType processorType; + + public FlowPropertyXpathDescriptor(final KafkaProcessorType processorType) { + this.processorType = processorType; + } + + @Override + public String getXpathForProperties() { + return "property"; + } + + @Override + public String getPropertyKeyTagName() { + return "name"; + } + + @Override + public String getPropertyTagName() { + return "property"; + } + + @Override + public String getXpathForTransactionProperty() { + return TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty"); + } + + @Override + public String getTransactionTagName() { + return TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName"); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java new file mode 100644 index 0000000000..11f507aaec --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class KafkaProcessorDescriptor implements ProcessorDescriptor { + private static final Map<String, String> CONSUME_KAFKA_PROCESSOR_PROPERTIES; + private static final Map<String, String> CONSUME_PROPERTIES_TO_BE_SAVED; + private static final Map<String, String> PUBLISH_KAFKA_PROCESSOR_PROPERTIES; + private static final Map<String, String> PUBLISH_PROPERTIES_TO_BE_SAVED; + private static final Map<String, String> CONTROLLER_SERVICES; + private static final Map<KafkaProcessorType, Map<String, String>> PROPERTIES; + private static final Map<KafkaProcessorType, Map<String, String>> PROPERTIES_TO_BE_SAVED; + + static { + CONSUME_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>(); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", "PLAINTEXT"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic_type", "names"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("group.id", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("auto.offset.reset", "latest"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", "utf-8"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("separate-by-key", "false"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", "UTF-8"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("header-name-regex", null); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max.poll.records", "10000"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max-uncommit-offset-wait", "1 secs"); + CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("Communications Timeout", "60 secs"); + + CONSUME_PROPERTIES_TO_BE_SAVED = new HashMap<>(); + CONSUME_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic"); + CONSUME_PROPERTIES_TO_BE_SAVED.put("Group ID", "group.id"); + + PUBLISH_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>(); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", "PLAINTEXT"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("topic", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("acks", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("Failure Strategy", "Route to Failure"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("transactional-id-prefix", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("attribute-name-regex", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", "UTF-8"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kafka-key", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", "utf-8"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.request.size", "1 MB"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ack.wait.time", "5 secs"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.block.ms", "5 sec"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partition", null); + PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("compression.type", null); + + PUBLISH_PROPERTIES_TO_BE_SAVED = new HashMap<>(); + PUBLISH_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic"); + PUBLISH_PROPERTIES_TO_BE_SAVED.put("Partition", "partition"); + PUBLISH_PROPERTIES_TO_BE_SAVED.put("Kafka Key", "kafka-key"); + PUBLISH_PROPERTIES_TO_BE_SAVED.put("Delivery Guarantee", "acks"); + PUBLISH_PROPERTIES_TO_BE_SAVED.put( "Compression Codec", "compression.type"); + + CONTROLLER_SERVICES = new HashMap<>(); + CONTROLLER_SERVICES.put("kerberos-credentials-service", "org.apache.nifi.kerberos.KerberosCredentialsService"); + CONTROLLER_SERVICES.put("ssl.context.service", "org.apache.nifi.ssl.SSLContextService"); + + PROPERTIES = new HashMap<>(); + PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_KAFKA_PROCESSOR_PROPERTIES); + PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_KAFKA_PROCESSOR_PROPERTIES); + + PROPERTIES_TO_BE_SAVED = new HashMap<>(); + PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.CONSUME, CONSUME_PROPERTIES_TO_BE_SAVED); + PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.PUBLISH, PUBLISH_PROPERTIES_TO_BE_SAVED); + } + + private final KafkaProcessorType processorType; + + public KafkaProcessorDescriptor(final KafkaProcessorType processorType) { + this.processorType = processorType; + } + + @Override + public Map<String, String> getProcessorProperties() { + return Collections.unmodifiableMap(PROPERTIES.get(processorType)); + } + + @Override + public Map<String, String> getPropertiesToBeSaved() { + return Collections.unmodifiableMap(PROPERTIES_TO_BE_SAVED.get(processorType)); + } + + @Override + public Map<String, String> getControllerServicesForTemplates() { + return Collections.unmodifiableMap(CONTROLLER_SERVICES); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java new file mode 100644 index 0000000000..e5f8a5dde7 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +public enum KafkaProcessorType { + PUBLISH("Publish"), + CONSUME("Consume"), + PUT("Put"); + + private final String processorType; + + KafkaProcessorType(String processorType) { + this.processorType = processorType; + } + + public String getProcessorType() { + return processorType; + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java new file mode 100644 index 0000000000..9a5f798b7d --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +import java.util.Map; + +public interface ProcessorDescriptor { + Map<String, String> getProcessorProperties(); + Map<String, String> getPropertiesToBeSaved(); + Map<String, String> getControllerServicesForTemplates(); + +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java new file mode 100644 index 0000000000..ffb266f8c5 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +public interface PropertyXpathDescriptor { + String getXpathForProperties(); + String getPropertyKeyTagName(); + String getPropertyTagName(); + String getXpathForTransactionProperty(); + String getTransactionTagName(); +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java new file mode 100644 index 0000000000..3276439375 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.descriptor; + +import java.util.HashMap; +import java.util.Map; + +public class TemplatePropertyXpathDescriptor implements PropertyXpathDescriptor { + + private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES; + private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES; + private static final Map<KafkaProcessorType, Map<String, String>> TRANSACTION_PROPERTIES; + static { + CONSUME_TRANSACTION_PROPERTIES = new HashMap<>(); + CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "entry[key=\"honor-transactions\"]/value"); + CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", "honor-transactions"); + PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>(); + PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", "entry[key=\"use-transactions\"]/value"); + PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", "use-transactions"); + TRANSACTION_PROPERTIES = new HashMap<>(); + TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, CONSUME_TRANSACTION_PROPERTIES); + TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, PUBLISH_TRANSACTION_PROPERTIES); + } + + private final KafkaProcessorType processorType; + + public TemplatePropertyXpathDescriptor(final KafkaProcessorType processorType) { + this.processorType = processorType; + } + + @Override + public String getXpathForProperties() { + return "entry"; + } + + @Override + public String getPropertyKeyTagName() { + return "key"; + } + + @Override + public String getPropertyTagName() { + return "entry"; + } + + @Override + public String getXpathForTransactionProperty() { + return TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty"); + } + + @Override + public String getTransactionTagName() { + return TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName"); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java new file mode 100644 index 0000000000..ec02bff12e --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.util.HashMap; +import java.util.Map; + +public abstract class AbstractKafkaMigrator implements Migrator { + static final XPath XPATH = XPathFactory.newInstance().newXPath(); + private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0"; + private final static String ARTIFACT = "nifi-kafka-2-0-nar"; + private final static String PATH_FOR_ARTIFACT = "bundle/artifact"; + + final boolean isVersion8Processor; + final boolean isKafkaBrokersPresent; + + final Map<String, String> kafkaProcessorProperties; + final Map<String, String> propertiesToBeSaved; + final Map<String, String> controllerServices; + + final String xpathForProperties; + final String propertyKeyTagName; + final String propertyTagName; + + final String xpathForTransactionProperty; + final String transactionTagName; + final boolean transaction; + + + public AbstractKafkaMigrator(final MigratorConfiguration configuration) { + final String kafkaBrokers = configuration.getKafkaBrokers(); + this.isKafkaBrokersPresent = !kafkaBrokers.isEmpty(); + this.isVersion8Processor = configuration.isVersion8Processor(); + this.kafkaProcessorProperties = new HashMap<>(configuration.getProcessorDescriptor().getProcessorProperties()); + this.propertiesToBeSaved = configuration.getProcessorDescriptor().getPropertiesToBeSaved(); + this.controllerServices = configuration.getProcessorDescriptor().getControllerServicesForTemplates(); + this.xpathForProperties = configuration.getPropertyXpathDescriptor().getXpathForProperties(); + this.propertyKeyTagName = configuration.getPropertyXpathDescriptor().getPropertyKeyTagName(); + this.propertyTagName = configuration.getPropertyXpathDescriptor().getPropertyTagName(); + this.xpathForTransactionProperty = configuration.getPropertyXpathDescriptor().getXpathForTransactionProperty(); + this.transactionTagName = configuration.getPropertyXpathDescriptor().getTransactionTagName(); + this.transaction = configuration.isTransaction(); + + if (isKafkaBrokersPresent) { + kafkaProcessorProperties.put("bootstrap.servers", kafkaBrokers); + } + } + + @Override + public void configureProperties(final Node node) throws XPathExpressionException { + if (isVersion8Processor && isKafkaBrokersPresent) { + final NodeList properties = (NodeList) XPATH.evaluate(xpathForProperties, node, XPathConstants.NODESET); + for (int i = 0; i < properties.getLength(); i++) { + final Node property = properties.item(i); + saveRequiredProperties(property); + removeElement(node, property); + } + addNewProperties(node); + } + } + + @Override + public void configureDescriptors(final Node node) throws XPathExpressionException { + if(isVersion8Processor && isKafkaBrokersPresent) { + final Element descriptorElement = (Element) XPATH.evaluate("config/descriptors", node, XPathConstants.NODE); + final NodeList descriptors = (NodeList) XPATH.evaluate("entry", descriptorElement, XPathConstants.NODESET); + for (int i = 0; i < descriptors.getLength(); i++) { + final Node descriptor = descriptors.item(i); + removeElement(descriptorElement, descriptor); + } + addNewDescriptors(descriptorElement); + } + } + + @Override + public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException { + final String transactionString = Boolean.toString(transaction); + final Element transactionsElement = (Element) XPATH.evaluate(xpathForTransactionProperty, node, XPathConstants.NODE); + + if (transactionsElement != null) { + transactionsElement.setTextContent(transactionString); + } else { + addNewProperty(node, transactionTagName, transactionString); + } + + kafkaProcessorProperties.put(transactionTagName, transactionString); + } + + public void replaceClassName(final Element className) { + final String processorName = StringUtils.substringAfterLast(className.getTextContent(), "."); + final String newClassName = replaceClassNameWithNewProcessorName(className.getTextContent(), processorName); + className.setTextContent(newClassName); + } + + public void replaceArtifact(final Node processor) throws XPathExpressionException { + ((Element) XPATH.evaluate(PATH_FOR_ARTIFACT, processor, XPathConstants.NODE)).setTextContent(ARTIFACT); + } + + private static String replaceClassNameWithNewProcessorName(final String className, final String processorName) { + final String newProcessorName = StringUtils.replaceEach(processorName, new String[]{"Get", "Put"}, new String[]{"pubsub.Consume", "pubsub.Publish"}); + final String processorNameWithNewVersion = + newProcessorName.replaceFirst("$|_0_1\\d?", NEW_KAFKA_PROCESSOR_VERSION); + return StringUtils.replace(className, processorName, processorNameWithNewVersion); + } + + private void addNewDescriptors(final Node node) { + for (String key: kafkaProcessorProperties.keySet()) { + final Element descriptorElement = node.getOwnerDocument().createElement("entry"); + node.appendChild(descriptorElement); + + final Element descriptorKeyElement = descriptorElement.getOwnerDocument().createElement("key"); + descriptorKeyElement.setTextContent(key); + descriptorElement.appendChild(descriptorKeyElement); + + final Element descriptorValueElement = descriptorElement.getOwnerDocument().createElement("value"); + descriptorElement.appendChild(descriptorValueElement); + + final Element descriptorNameElement = descriptorValueElement.getOwnerDocument().createElement("name"); + descriptorNameElement.setTextContent(key); + descriptorValueElement.appendChild(descriptorNameElement); + + if (controllerServices.containsKey(key)) { + final Element controllerServiceElement = descriptorValueElement.getOwnerDocument().createElement("identifiesControllerService"); + controllerServiceElement.setTextContent(controllerServices.get(key)); + descriptorValueElement.appendChild(controllerServiceElement); + } + } + } + + private void saveRequiredProperties(final Node property) throws XPathExpressionException { + final String propertyToBeSaved = propertiesToBeSaved.get(XPATH.evaluate(propertyKeyTagName, property)); + + if (propertyToBeSaved != null) { + String propertyValue = XPATH.evaluate("value", property); + kafkaProcessorProperties.put(propertyToBeSaved, convert(propertyValue)); + } + } + + private String convert(final String propertyValue) { + return propertyValue.isEmpty() ? null : propertyValue; + } + + private void addNewProperties(final Node node) { + for (Map.Entry<String, String> entry : kafkaProcessorProperties.entrySet()) { + addNewProperty(node, entry.getKey(), entry.getValue()); + } + } + + private void addNewProperty(final Node node, final String key, final String value) { + final Element propertyElement = node.getOwnerDocument().createElement(propertyTagName); + node.appendChild(propertyElement); + + final Element propertyKeyElement = propertyElement.getOwnerDocument().createElement(propertyKeyTagName); + propertyKeyElement.setTextContent(key); + + propertyElement.appendChild(propertyKeyElement); + + if (value != null) { + final Element propertyValueElement = propertyElement.getOwnerDocument().createElement("value"); + propertyValueElement.setTextContent(value); + + propertyElement.appendChild(propertyValueElement); + } + } + + private void removeElement(final Node node, final Node element) { + node.removeChild(element); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java new file mode 100644 index 0000000000..022c2db933 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathExpressionException; + +public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator { + + public ConsumeKafkaFlowMigrator(final MigratorConfiguration configuration) { + super(configuration); + } + + @Override + public void migrate(final Element className, final Node processor) throws XPathExpressionException { + configureProperties(processor); + configureComponentSpecificSteps(processor); + replaceClassName(className); + replaceArtifact(processor); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java new file mode 100644 index 0000000000..3abb118a3c --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; + +public class ConsumeKafkaTemplateMigrator extends AbstractKafkaMigrator { + + public ConsumeKafkaTemplateMigrator(final MigratorConfiguration configuration) { + super(configuration); + } + + @Override + public void configureProperties(final Node node) throws XPathExpressionException { + final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE); + super.configureProperties(propertyElement); + } + + @Override + public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException { + final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE); + super.configureComponentSpecificSteps(propertyElement); + } + + @Override + public void migrate(final Element className, final Node processor) throws XPathExpressionException { + configureProperties(processor); + configureComponentSpecificSteps(processor); + configureDescriptors(processor); + replaceClassName(className); + replaceArtifact(processor); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java new file mode 100644 index 0000000000..cf41451099 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathExpressionException; + +public interface Migrator { + void configureProperties(final Node node) throws XPathExpressionException; + void configureDescriptors(final Node node) throws XPathExpressionException; + void configureComponentSpecificSteps(final Node node) throws XPathExpressionException; + void migrate(final Element className, final Node node) throws XPathExpressionException; +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java new file mode 100644 index 0000000000..7ef865d25c --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; + +public class PublishKafkaFlowMigrator extends AbstractKafkaMigrator { + + public PublishKafkaFlowMigrator(final MigratorConfiguration configuration) { + super(configuration); + } + + @Override + public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException { + final Element deliveryGuaranteeValue = (Element) XPATH.evaluate("property[name=\"acks\"]/value", node, XPathConstants.NODE); + if (this.transaction && deliveryGuaranteeValue != null) { + deliveryGuaranteeValue.setTextContent("all"); + } + super.configureComponentSpecificSteps(node); + } + + @Override + public void migrate(final Element className, final Node processor) throws XPathExpressionException { + configureProperties(processor); + configureComponentSpecificSteps(processor); + replaceClassName(className); + replaceArtifact(processor); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java new file mode 100644 index 0000000000..7b0538462a --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.migrator; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; + +public class PublishKafkaTemplateMigrator extends AbstractKafkaMigrator { + + public PublishKafkaTemplateMigrator(final MigratorConfiguration configuration) { + super(configuration); + } + + @Override + public void configureProperties(final Node node) throws XPathExpressionException { + final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE); + super.configureProperties(propertyElement); + } + + @Override + public void configureComponentSpecificSteps(final Node node) throws XPathExpressionException { + //add value if null + final Element propertyElement = (Element) XPATH.evaluate("config/properties", node, XPathConstants.NODE); + final Element deliveryGuaranteeValue = (Element) XPATH.evaluate("entry[key=\"acks\"]/value", propertyElement, XPathConstants.NODE); + if (this.transaction && deliveryGuaranteeValue != null) { + deliveryGuaranteeValue.setTextContent("all"); + } + super.configureComponentSpecificSteps(propertyElement); + } + + @Override + public void migrate(final Element className, final Node processor) throws XPathExpressionException { + configureProperties(processor); + configureComponentSpecificSteps(processor); + configureDescriptors(processor); + replaceClassName(className); + replaceArtifact(processor); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java new file mode 100644 index 0000000000..f2cb98d19b --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.service; + +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType; +import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator; +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor; + + +public class KafkaFlowMigrationService implements KafkaMigrationService { + private static final String XPATH_FOR_PROCESSORS_IN_FLOW = ".//processor"; + private static final String CLASS_TAG_NAME = "class"; + + public KafkaFlowMigrationService() { + } + + @Override + public String getPathForProcessors() { + return XPATH_FOR_PROCESSORS_IN_FLOW; + } + + @Override + public String getPathForClass() { + return CLASS_TAG_NAME; + } + + @Override + public Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH)) + .setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH)); + return new PublishKafkaFlowMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME)) + .setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME)); + return new ConsumeKafkaFlowMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH)) + .setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH)); + return new PublishKafkaFlowMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME)) + .setPropertyXpathDescriptor(new FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME)); + return new ConsumeKafkaFlowMigrator(configurationBuilder.build()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java new file mode 100644 index 0000000000..1428002077 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.service; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType; +import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; + +public interface KafkaMigrationService { + + String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES = "(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?"; + boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE; + boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE; + + String getPathForProcessors(); + String getPathForClass(); + Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder); + Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder); + Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder); + Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder); + + default void replaceKafkaProcessors(final Document document, final MigratorConfigurationBuilder configurationBuilder) throws XPathExpressionException { + Migrator migrator; + final XPath xPath = XPathFactory.newInstance().newXPath(); + + final NodeList processors = (NodeList) xPath.evaluate(getPathForProcessors(), document, XPathConstants.NODESET); + for (int i = 0; i < processors.getLength(); i++) { + final Node processor = processors.item(i); + final Element className = ((Element) xPath.evaluate(getPathForClass(), processor, XPathConstants.NODE)); + final String processorName = StringUtils.substringAfterLast(className.getTextContent(), "."); + + if (processorName.matches(REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES)) { + if (processorName.contains(KafkaProcessorType.PUBLISH.getProcessorType())) { + migrator = createPublishMigrator(configurationBuilder); + } else if (processorName.contains(KafkaProcessorType.PUT.getProcessorType())) { + migrator = createVersionEightPublishMigrator(configurationBuilder); + } else if (processorName.contains(KafkaProcessorType.CONSUME.getProcessorType())) { + migrator = createConsumeMigrator(configurationBuilder); + } else { + migrator = createVersionEightConsumeMigrator(configurationBuilder); + } + + migrator.migrate(className, processor); + } + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java new file mode 100644 index 0000000000..fa4b06d00a --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator.service; + +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType; +import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor; + +public class KafkaTemplateMigrationService implements KafkaMigrationService { + private static final String XPATH_FOR_PROCESSORS_IN_TEMPLATE = ".//processors"; + private static final String TYPE_TAG_NAME = "type"; + + public KafkaTemplateMigrationService() { + } + + @Override + public String getPathForProcessors() { + return XPATH_FOR_PROCESSORS_IN_TEMPLATE; + } + + @Override + public String getPathForClass() { + return TYPE_TAG_NAME; + } + + @Override + public Migrator createPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH)) + .setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH)); + return new PublishKafkaTemplateMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME)) + .setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME)); + return new ConsumeKafkaTemplateMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createVersionEightPublishMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH)) + .setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH)); + return new PublishKafkaTemplateMigrator(configurationBuilder.build()); + } + + @Override + public Migrator createVersionEightConsumeMigrator(final MigratorConfigurationBuilder configurationBuilder) { + configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR) + .setProcessorDescriptor(new KafkaProcessorDescriptor(KafkaProcessorType.CONSUME)) + .setPropertyXpathDescriptor(new TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME)); + return new ConsumeKafkaTemplateMigrator(configurationBuilder.build()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java new file mode 100644 index 0000000000..5c3a54fb9c --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService; +import org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService; +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class KafkaMigrationServiceTest { + + private static final List<String> EXPECTED_CLASS_OR_TYPE_NAMES = + Arrays.asList("org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0", + "org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0", + "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0"); + + private static final List<String> EXPECTED_ARTIFACTS = + Arrays.asList("nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar"); + + private static final MigratorConfigurationBuilder CONFIGURATION_BUILDER = + new MigratorConfigurationBuilder().setKafkaBrokers("kafkaBrokers, localhost:1234") + .setTransaction(Boolean.FALSE); + private static final XPath XPATH = XPathFactory.newInstance().newXPath(); + private static final String PATH_FOR_PROCESSORS_IN_FLOWS = ".//processor"; + private static final String PATH_FOR_PROCESSORS_IN_TEMPLATES = ".//processors"; + private static final String PATH_FOR_CLASS_ELEMENT = "class"; + private static final String PATH_FOR_TYPE_ELEMENT = "type"; + private static final String PATH_FOR_ARTIFACT_ELEMENT = "bundle/artifact"; + + + @Test + public void testClassReplacement() throws XPathExpressionException, IOException { + final KafkaFlowMigrationService kafkaMigrationService = new KafkaFlowMigrationService(); + final Document document = KafkaMigrationUtil.parseDocument(); + final List<String> originalClassNames = createClassResultList(document); + + kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER); + final List<String> actualClassNames = createClassResultList(document); + + assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualClassNames, originalClassNames); + } + + @Test + public void testTypeReplacement() throws XPathExpressionException, IOException { + final KafkaTemplateMigrationService kafkaMigrationService = new KafkaTemplateMigrationService(); + final Document document = KafkaMigrationUtil.parseDocument(); + final List<String> originalTypeNames = createTypeResultList(document); + + kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER); + final List<String> actualTypeNames = createTypeResultList(document); + + assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualTypeNames, originalTypeNames); + } + + @Test + public void testArtifactReplacementInTemplate() throws XPathExpressionException, IOException { + final KafkaTemplateMigrationService kafkaMigrationService = new KafkaTemplateMigrationService(); + final Document document = KafkaMigrationUtil.parseDocument(); + final List<String> originalArtifacts = createArtifactResultListForTemplate(document); + + kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER); + final List<String> actualArtifacts = createArtifactResultListForTemplate(document); + + assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts); + } + + @Test + public void testArtifactReplacementInFlow() throws XPathExpressionException, IOException { + final KafkaFlowMigrationService kafkaMigrationService = new KafkaFlowMigrationService(); + final Document document = KafkaMigrationUtil.parseDocument(); + final List<String> originalArtifacts = createArtifactResultListForFlow(document); + + kafkaMigrationService.replaceKafkaProcessors(document, CONFIGURATION_BUILDER); + final List<String> actualArtifacts = createArtifactResultListForFlow(document); + + assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts); + } + + private List<String> createClassResultList(final Document document) throws XPathExpressionException { + return createProcessorResultListForFlow(document, PATH_FOR_CLASS_ELEMENT); + } + + private List<String> createArtifactResultListForFlow(final Document document) throws XPathExpressionException { + return createProcessorResultListForFlow(document, PATH_FOR_ARTIFACT_ELEMENT); + } + + private List<String> createTypeResultList(final Document document) throws XPathExpressionException { + return createProcessorResultListForTemplate(document, PATH_FOR_TYPE_ELEMENT); + } + + private List<String> createArtifactResultListForTemplate(final Document document) throws XPathExpressionException { + return createProcessorResultListForTemplate(document, PATH_FOR_ARTIFACT_ELEMENT); + } + + private List<String> createProcessorResultListForFlow(final Document document, final String elementPath) throws XPathExpressionException { + return createProcessorResultList(document, PATH_FOR_PROCESSORS_IN_FLOWS, elementPath); + } + + private List<String> createProcessorResultListForTemplate(final Document document, final String elementPath) throws XPathExpressionException { + return createProcessorResultList(document, PATH_FOR_PROCESSORS_IN_TEMPLATES, elementPath); + } + + private List<String> createProcessorResultList(final Document document, final String processorPath, final String elementPath) throws XPathExpressionException { + final List<String> resultList = new ArrayList<>(); + final NodeList processors = (NodeList) XPATH.evaluate(processorPath, document, XPathConstants.NODESET); + for (int i = 0; i < processors.getLength(); i++) { + resultList.add(XPATH.evaluate(elementPath, processors.item(i))); + } + return resultList; + } + + private void assertSuccess(final List<String> expectedArtifacts, final List<String> actualArtifacts, final List<String> originalArtifacts) { + assertArrayEquals(expectedArtifacts.toArray(), actualArtifacts.toArray()); + assertNoReplacementHappened(originalArtifacts, actualArtifacts); + assertReplacementHappened(originalArtifacts, actualArtifacts); + } + + private void assertNoReplacementHappened(final List<String> originalArtifacts, final List<String> actualArtifacts) { + assertEquals(originalArtifacts.get(0), actualArtifacts.get(0)); + } + + private void assertReplacementHappened(final List<String> originalArtifacts, final List<String> actualArtifacts) { + assertNotEquals(originalArtifacts.get(1), actualArtifacts.get(1)); + assertNotEquals(originalArtifacts.get(2), actualArtifacts.get(2)); + } +} \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java new file mode 100644 index 0000000000..b095e1cbc4 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.xml.processing.parsers.DocumentProvider; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.w3c.dom.Document; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class KafkaMigrationUtil { + public static Document parseDocument() throws IOException { + final DocumentProvider documentProvider = new StandardDocumentProvider(); + return documentProvider.parse(Files.newInputStream(Paths.get("src/test/resources/flow.xml"))); + } +} diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java new file mode 100644 index 0000000000..6394516ece --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.kafkamigrator; + +import org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor; +import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator; +import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator; +import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder; +import org.junit.jupiter.api.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Node; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaMigratorTest { + private static final XPath XPATH = XPathFactory.newInstance().newXPath(); + private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW = ".//processor[class='org.apache.nifi.processors.kafka.PutKafka']"; + private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE = ".//processors[type='org.apache.nifi.processors.kafka.PutKafka']"; + + private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW = ".//processor[class='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']"; + private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE = ".//processors[type='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']"; + private static final KafkaProcessorDescriptor PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR = new KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH); + private static final boolean WITH_TRANSACTION = Boolean.TRUE; + private static final boolean WITHOUT_TRANSACTION = Boolean.FALSE; + private static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE; + private static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE; + private static final String FLOW = "Flow"; + private static final String TEMPLATE = "Template"; + + @Test + public void testPropertiesRemoved() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW); + final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureProperties(processor); + + assertPropertyRemoveSuccess(processor); + } + + @Test + public void testPropertiesAdded() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,KafkaProcessorType.PUBLISH, FLOW); + final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureProperties(processor); + + assertPropertyAddSuccess(processor); + } + + @Test + public void testPropertiesSaved() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW); + final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + final List<String> oldValues = getOldValues(processor); + + flowMigrator.configureProperties(processor); + + final List<String> newValues = getNewValues(processor); + assertEquals(oldValues, newValues); + } + + @Test + public void testDescriptorsAdded() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE); + final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureDescriptors(processor); + + assertDescriptorAddSuccess(processor); + } + + @Test + public void testDescriptorsRemoved() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE); + final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureDescriptors(processor); + + assertDescriptorRemoveSuccess(processor); + } + + @Test + public void testTransactionFlowPropertyForConsumeProcessorWithTrue() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, FLOW); + final ConsumeKafkaFlowMigrator flowMigrator = new ConsumeKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureComponentSpecificSteps(processor); + + assertEquals("true", XPATH.evaluate("property[name='honor-transactions']/value", processor)); + } + + @Test + public void testTransactionTemplatePropertyForConsumeProcessorWithTrue() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, TEMPLATE); + final ConsumeKafkaTemplateMigrator templateMigrator = new ConsumeKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureComponentSpecificSteps(processor); + + assertEquals("true", XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", processor)); + } + + @Test + public void testTransactionFlowPropertyForConsumeProcessorWithFalse() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, FLOW); + final ConsumeKafkaFlowMigrator flowMigrator = new ConsumeKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureComponentSpecificSteps(processor); + + assertEquals("false", XPATH.evaluate("property[name='honor-transactions']/value", processor)); + } + + @Test + public void testTransactionTemplatePropertyForConsumeProcessorWithFalse() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.CONSUME, TEMPLATE); + final ConsumeKafkaTemplateMigrator templateMigrator = new ConsumeKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureComponentSpecificSteps(processor); + + assertEquals("false", XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", processor)); + } + + @Test + public void testTransactionFlowPropertyForPublishProcessorWithTrue() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW); + final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureComponentSpecificSteps(processor); + + assertEquals("true", XPATH.evaluate("property[name='use-transactions']/value", processor)); + assertEquals("", XPATH.evaluate("property[name='acks']/value", processor)); + + } + + @Test + public void testTransactionTemplatePropertyForPublishProcessorWithTrue() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE); + final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureComponentSpecificSteps(processor); + + assertEquals("true", XPATH.evaluate("config/properties/entry[key='use-transactions']/value", processor)); + assertEquals("", XPATH.evaluate("config/properties/entry[key='acks']/value", processor)); + } + + @Test + public void testTransactionFlowPropertyForPublishProcessorWithFalse() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, FLOW); + final PublishKafkaFlowMigrator flowMigrator = new PublishKafkaFlowMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, XPathConstants.NODE); + + flowMigrator.configureComponentSpecificSteps(processor); + + assertEquals("false", XPATH.evaluate("property[name='use-transactions']/value", processor)); + } + + @Test + public void testTransactionTemplatePropertyForPublishProcessorWithFalse() throws XPathExpressionException, IOException { + final MigratorConfiguration configuration = getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, KafkaProcessorType.PUBLISH, TEMPLATE); + final PublishKafkaTemplateMigrator templateMigrator = new PublishKafkaTemplateMigrator(configuration); + final Document document = KafkaMigrationUtil.parseDocument(); + final Node processor = (Node) XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, XPathConstants.NODE); + + templateMigrator.configureComponentSpecificSteps(processor); + + assertEquals("false", XPATH.evaluate("config/properties/entry[key='use-transactions']/value", processor)); + } + + + private List<String> getValues(final Collection<String> properties, final Node node) throws XPathExpressionException { + final List<String> result = new ArrayList<>(); + for (String propertyName : properties) { + result.add(XPATH.evaluate(String.format("property[name='%s']/value", propertyName), node)); + } + return result; + } + + private List<String> getOldValues(final Node node) throws XPathExpressionException { + return getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().keySet(), node); + } + + private List<String> getNewValues(final Node node) throws XPathExpressionException { + return getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().values(), node); + } + + private void assertPropertyRemoveSuccess(final Node node) throws XPathExpressionException { + assertTrue(XPATH.evaluate("property[name='Known Brokers']", node).isEmpty()); + } + + private void assertDescriptorRemoveSuccess(final Node node) throws XPathExpressionException { + assertTrue(XPATH.evaluate("config/descriptors/entry[key='Known Brokers']", node).isEmpty()); + } + + private void assertAddSuccess(final String xpath, final Node node) throws XPathExpressionException { + for (String propertyName: PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getProcessorProperties().keySet()) { + assertFalse(XPATH.evaluate(String.format(xpath, propertyName), node).isEmpty()); + } + } + private void assertPropertyAddSuccess(final Node node) throws XPathExpressionException { + assertAddSuccess("property[name='%s']/name", node); + } + + private void assertDescriptorAddSuccess(final Node node) throws XPathExpressionException { + assertAddSuccess("config/descriptors/entry[key='%s']/key", node); + } + + private MigratorConfiguration getConfiguration(final boolean transaction, final boolean isVersion8Processor, + final KafkaProcessorType processorType, final String migrationType) { + final MigratorConfigurationBuilder configurationBuilder = new MigratorConfigurationBuilder(); + final PropertyXpathDescriptor propertyXpathDescriptor; + + if (migrationType.equalsIgnoreCase("Flow")) { + propertyXpathDescriptor = new FlowPropertyXpathDescriptor(processorType); + } else { + propertyXpathDescriptor= new TemplatePropertyXpathDescriptor(processorType); + } + + return configurationBuilder.setKafkaBrokers("kafkaBrokers, localhost:1234") + .setTransaction(transaction) + .setIsVersion8Processor(isVersion8Processor) + .setProcessorDescriptor(new KafkaProcessorDescriptor(processorType)) + .setPropertyXpathDescriptor(propertyXpathDescriptor) + .build(); + } +} \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml new file mode 100644 index 0000000000..b1cc3b5216 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml @@ -0,0 +1,136 @@ +<rootGroup> + <processGroup> + <processor> + <class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-kafka-2-0-nar</artifact> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <property> + <name>bootstrap.servers</name> + <value>localhost:9092</value> + </property> + </processor> + <processor> + <class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-kafka-0-10-nar</artifact> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <property> + <name>bootstrap.servers</name> + <value>localhost:9092</value> + </property> + </processor> + <processor> + <class>org.apache.nifi.processors.kafka.PutKafka</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-kafka-0-8-nar</artifact> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <property> + <name>Known Brokers</name> + </property> + <property> + <name>Topic Name</name> + <value>test-topic</value> + </property> + <property> + <name>Partition</name> + <value>test-partition</value> + </property> + <property> + <name>Kafka Key</name> + <value>kafka-key</value> + </property> + <property> + <name>Delivery Guarantee</name> + <value>1</value> + </property> + <property> + <name>Compression Codec</name> + <value>gzip</value> + </property> + </processor> + </processGroup> + <template encoding-version="1.3"> + <snippet> + <processGroups> + <contents> + <processors> + <bundle> + <artifact>nifi-kafka-2-0-nar</artifact> + <group>org.apache.nifi</group> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <config> + <descriptors> + <entry> + <key>Known Brokers</key> + <value> + <name>Known Brokers</name> + </value> + </entry> + </descriptors> + <properties> + <entry> + <key>Known Brokers</key> + </entry> + </properties> + </config> + <type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</type> + </processors> + <processors> + <bundle> + <artifact>nifi-kafka-0-10-nar</artifact> + <group>org.apache.nifi</group> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <config> + <descriptors> + <entry> + <key>Known Brokers</key> + <value> + <name>Known Brokers</name> + </value> + </entry> + </descriptors> + <properties> + <entry> + <key>Known Brokers</key> + </entry> + </properties> + </config> + <type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</type> + </processors> + <processors> + <bundle> + <artifact>nifi-kafka-0-8-nar</artifact> + <group>org.apache.nifi</group> + <version>1.13.2.2.1.2.0-283</version> + </bundle> + <config> + <descriptors> + <entry> + <key>Known Brokers</key> + <value> + <name>Known Brokers</name> + </value> + </entry> + </descriptors> + <properties> + <entry> + <key>Known Brokers</key> + </entry> + </properties> + </config> + <type>org.apache.nifi.processors.kafka.PutKafka</type> + </processors> + </contents> + </processGroups> + </snippet> + </template> +</rootGroup> \ No newline at end of file diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml index d4d8c1f63b..a65c649c04 100644 --- a/nifi-toolkit/pom.xml +++ b/nifi-toolkit/pom.xml @@ -33,6 +33,7 @@ <module>nifi-toolkit-flowanalyzer</module> <module>nifi-toolkit-cli</module> <module>nifi-toolkit-api</module> + <module>nifi-toolkit-kafka-migrator</module> </modules> <properties> <toolkit.groovy.version>3.0.8</toolkit.groovy.version>