This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 984c0a0baf NIFI-11044 Script/commands to migrate Kafka processors
984c0a0baf is described below

commit 984c0a0baf0cac189c2e2f6d711978e5de651db1
Author: Timea Barna <timea.ba...@gmail.com>
AuthorDate: Wed Mar 1 15:03:53 2023 +0100

    NIFI-11044 Script/commands to migrate Kafka processors
    
    This closes #6998.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 nifi-docs/src/main/asciidoc/toolkit-guide.adoc     |  69 ++++-
 nifi-toolkit/nifi-toolkit-assembly/pom.xml         |   5 +
 .../src/main/resources/bin/kafka-migrator.bat      |  41 +++
 .../src/main/resources/bin/kafka-migrator.sh       | 119 +++++++++
 nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml   |  51 ++++
 .../toolkit/kafkamigrator/KafkaMigratorMain.java   | 130 ++++++++++
 .../kafkamigrator/MigratorConfiguration.java       |  95 +++++++
 .../descriptor/FlowPropertyXpathDescriptor.java    |  69 +++++
 .../descriptor/KafkaProcessorDescriptor.java       | 128 ++++++++++
 .../descriptor/KafkaProcessorType.java             |  33 +++
 .../descriptor/ProcessorDescriptor.java            |  26 ++
 .../descriptor/PropertyXpathDescriptor.java        |  25 ++
 .../TemplatePropertyXpathDescriptor.java           |  69 +++++
 .../migrator/AbstractKafkaMigrator.java            | 193 ++++++++++++++
 .../migrator/ConsumeKafkaFlowMigrator.java         |  38 +++
 .../migrator/ConsumeKafkaTemplateMigrator.java     |  52 ++++
 .../toolkit/kafkamigrator/migrator/Migrator.java   |  29 +++
 .../migrator/PublishKafkaFlowMigrator.java         |  48 ++++
 .../migrator/PublishKafkaTemplateMigrator.java     |  57 +++++
 .../service/KafkaFlowMigrationService.java         |  76 ++++++
 .../service/KafkaMigrationService.java             |  72 ++++++
 .../service/KafkaTemplateMigrationService.java     |  75 ++++++
 .../kafkamigrator/KafkaMigrationServiceTest.java   | 155 ++++++++++++
 .../toolkit/kafkamigrator/KafkaMigrationUtil.java  |  32 +++
 .../toolkit/kafkamigrator/KafkaMigratorTest.java   | 278 +++++++++++++++++++++
 .../src/test/resources/flow.xml                    | 136 ++++++++++
 nifi-toolkit/pom.xml                               |   1 +
 27 files changed, 2101 insertions(+), 1 deletion(-)

diff --git a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc 
b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
index 6e24472adf..4c05dd6980 100644
--- a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
@@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to 
ZooKeeper 3.5.x, the migrator
 * For a ZooKeeper using Kerberos for authentication:
 ** `zk-migrator.sh -s -z 
destinationHostname:destinationClientPort/destinationRootPath/components -k 
/path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
 
-6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
\ No newline at end of file
+6. Once the migration has completed successfully, start the processors in the 
NiFi flow.  Processing should continue from the point at which it was stopped 
when the NiFi flow was stopped.
+
+[[kafka_migrator]]
+== Kafka Processor Migrator
+With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11 
were removed.
+In large flows having many numbers of components it is challenging to replace 
these processors manually.
+This tool can be used to update a flow in an automated way.
+
+=== Usage
+Running the script requires 3 mandatory and 1 optional parameters:
+
+* Input file, the full path of the flow.xml.gz in which the replacement is 
required.
+* Output file, the full path of the file where the results should be saved.
+* Transaction, whether the new processors should be configured with or without 
transaction usage.
+* Optional: Kafka Brokers, a comma separated list of Kafka Brokers in 
<host>:<port> format.
+
+Different input and output files must be used.
+Kafka Broker argument can be omitted if flow does not contain GetKafka or 
PutKafka processors.
+
+1. Run script, a possible example:
+
+ ./bin/kafka-migrator.sh -i "/tmp/flow/flow.xml.gz" -o 
"/tmp/flow/flow_result.xml.gz" -t false -k 
"mykafkaserver1:1234,mykafkaserver2:1235"
+
+2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input 
file.
+3. Copy flow.xml.gz file to all the NiFi nodes conf directory
+4. Start NiFi
+5. Verify the results.
+
+=== Expected Behaviour
+* Flow replacement:
+* For all replaced processors:
+** changing class and artifact
+** configure transaction as true
+*** 'Delivery Guarantee' property will be set to 'Replicated'
+*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in 
the file they will be set to true
+*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be 
translated as true in NiFi
+** configure transaction as false
+*** 'Delivery Guarantee' property will keep its original setting.
+*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
+* For version 0.8 processors (when kafka broker list argument provided)
+** remove all version 0.8 properties
+** add version 2.0 properties with default value except for 'Topic Name', 
'Group ID',
+'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
+'Compression Codec' values which will be copied over
+
+* Template replacement:
+* For all replaced processors:
+** changing type and artifact
+** configure transaction as true
+*** 'Delivery Guarantee' property will be set to 'Replicated'
+*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in 
the file they will be set to true
+*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be 
translated as true in NiFi
+** configure transaction as false
+*** 'Delivery Guarantee' property will keep its original setting.
+*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
+* For version 0.8 processors (when kafka broker list argument provided)
+** remove all version 0.8 properties and descriptors
+** add version 2.0 properties with default value except for 'Topic Name', 
'Group ID',
+'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
+'Compression Codec' values which will be copied over
+*** add version 2.0 descriptors
+
+=== Limitations
+* All deprecated Kafka processors will be replaced with version 2.0 processors.
+* Script will not rename the processors, only their type will be changed.
+* Transaction setting will be applied to all the replaced processors.
+* The flow.xml.gz file needs to be fit in memory and process time depends on 
the file size.
+* Processors in templates will be replaced as well, please download the 
original templates if desired.
\ No newline at end of file
diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml 
b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
index 107592fccd..a42276e55f 100644
--- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
@@ -93,6 +93,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-toolkit-flowanalyzer</artifactId>
             <version>1.21.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-toolkit-kafka-migrator</artifactId>
+            <version>1.21.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-toolkit-cli</artifactId>
diff --git 
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
new file mode 100644
index 0000000000..36fa74c722
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
@@ -0,0 +1,41 @@
+@echo off
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+
+if "%JAVA_HOME%" == "" goto noJavaHome
+if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
+set JAVA_EXE=%JAVA_HOME%\bin\java.exe
+goto startConfig
+
+:noJavaHome
+echo The JAVA_HOME environment variable is not defined correctly.
+echo Instead the PATH will be used to find the java executable.
+echo.
+set JAVA_EXE=java
+goto startConfig
+
+:startConfig
+set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
+
+if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS% 
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain
+
+cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
+
diff --git 
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
new file mode 100644
index 0000000000..c4925ff87b
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
@@ -0,0 +1,119 @@
+#!/bin/sh
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+#
+
+# Script structure inspired from Apache Karaf and other Apache projects with 
similar startup approaches
+
+SCRIPT_DIR=$(dirname "$0")
+SCRIPT_NAME=$(basename "$0")
+NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
+PROGNAME=$(basename "$0")
+
+
+warn() {
+    (>&2 echo "${PROGNAME}: $*")
+}
+
+die() {
+    warn "$*"
+    exit 1
+}
+
+detectOS() {
+    # OS specific support (must be 'true' or 'false').
+    cygwin=false;
+    aix=false;
+    os400=false;
+    darwin=false;
+    case "$(uname)" in
+        CYGWIN*)
+            cygwin=true
+            ;;
+        AIX*)
+            aix=true
+            ;;
+        OS400*)
+            os400=true
+            ;;
+        Darwin)
+            darwin=true
+            ;;
+    esac
+    # For AIX, set an environment variable
+    if ${aix}; then
+         export LDR_CNTRL=MAXDATA=0xB0000000@DSA
+         echo ${LDR_CNTRL}
+    fi
+}
+
+locateJava() {
+    # Setup the Java Virtual Machine
+    if $cygwin ; then
+        [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
+        [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
+    fi
+
+    if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
+        JAVA_HOME=$(java-config --jre-home)
+    fi
+    if [ "x${JAVA}" = "x" ]; then
+        if [ "x${JAVA_HOME}" != "x" ]; then
+            if [ ! -d "${JAVA_HOME}" ]; then
+                die "JAVA_HOME is not valid: ${JAVA_HOME}"
+            fi
+            JAVA="${JAVA_HOME}/bin/java"
+        else
+            warn "JAVA_HOME not set; results may vary"
+            JAVA=$(type java)
+            JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
+            if [ "x${JAVA}" = "x" ]; then
+                die "java command not found"
+            fi
+        fi
+    fi
+}
+
+init() {
+    # Determine if there is special OS handling we must perform
+    detectOS
+
+    # Locate the Java VM to execute
+    locateJava "$1"
+}
+
+run() {
+    LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
+
+    sudo_cmd_prefix=""
+    if $cygwin; then
+        NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
+        CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows 
"${LIBS}")"
+    else
+        CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
+    fi
+
+   export JAVA_HOME="$JAVA_HOME"
+   export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
+
+   umask 0077
+   exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m} 
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain "$@"
+}
+
+
+init "$1"
+run "$@"
diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
new file mode 100644
index 0000000000..f9fbde5b8f
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-toolkit</artifactId>
+        <version>1.21.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-toolkit-kafka-migrator</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-xml-processing</artifactId>
+            <version>1.21.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/flow.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
new file mode 100644
index 0000000000..29b26c94f8
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
+import 
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
+import org.w3c.dom.Document;
+
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+
+public class KafkaMigratorMain {
+
+    private static void printUsage() {
+        System.out.println("This application replaces Kafka processors from 
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
+                " in a flow.xml.gz file.");
+        System.out.println("\n");
+        System.out.println("Usage: kafka-migrator.sh -i <path to input 
flow.xml.gz> -o <path to output flow.xml.gz>" +
+                " -t <use transaction true or false>\noptional: -k <comma 
separated kafka brokers in <host>:<port> format. " +
+                "Required for version 0.8 processors only>");
+    }
+
+    public static void main(final String[] args) throws Exception {
+        if (showingUsageNeeded(args)) {
+            printUsage();
+            return;
+        }
+
+        String input = "";
+        if (args[0].equalsIgnoreCase("-i")) {
+            input = args[1];
+        }
+
+        String output = "";
+        if (args[2].equalsIgnoreCase("-o")) {
+             output = args[3];
+        }
+        if (input.equalsIgnoreCase(output)) {
+            System.out.println("Input and output files should be different.");
+            return;
+        }
+
+        String transaction = "";
+        if (args[4].equalsIgnoreCase("-t")) {
+            transaction = args[5];
+        }
+
+        if (!(transaction.equalsIgnoreCase("true") || 
transaction.equalsIgnoreCase("false"))) {
+            System.out.println("Transaction argument should be either true or 
false.");
+            return;
+        }
+
+        String kafkaBrokers = "";
+        if (args.length == 8) {
+            if (args[6].equalsIgnoreCase("-k") && args[7].matches(".+:\\d+")) {
+                kafkaBrokers = args[7];
+            } else {
+                System.out.println("Kafka Brokers must be in a <host>:<port> 
format, can be separated by comma. " +
+                        "For example: hostname:1234, host:5678");
+                return;
+            }
+        }
+
+        final MigratorConfigurationBuilder configurationBuilder = new 
MigratorConfigurationBuilder();
+        configurationBuilder.setKafkaBrokers(kafkaBrokers)
+                            .setTransaction(Boolean.parseBoolean(transaction));
+
+        final InputStream fileStream = Files.newInputStream(Paths.get(input));
+        final OutputStream outputStream = 
Files.newOutputStream(Paths.get(output));
+        final InputStream gzipStream = new GZIPInputStream(fileStream);
+        final OutputStream gzipOutStream = new GZIPOutputStream(outputStream);
+
+        System.out.println("Using flow=" + input);
+
+        try {
+            final DocumentProvider documentProvider = new 
StandardDocumentProvider();
+            final Document document = documentProvider.parse(gzipStream);
+
+            final KafkaFlowMigrationService flowMigrationService = new 
KafkaFlowMigrationService();
+            final KafkaTemplateMigrationService templateMigrationService = new 
KafkaTemplateMigrationService();
+
+            System.out.println("Replacing processors.");
+            flowMigrationService.replaceKafkaProcessors(document, 
configurationBuilder);
+            templateMigrationService.replaceKafkaProcessors(document, 
configurationBuilder);
+
+            final StreamResult streamResult = new StreamResult(gzipOutStream);
+            final StandardTransformProvider transformProvider = new 
StandardTransformProvider();
+            transformProvider.setIndent(true);
+            transformProvider.transform(new DOMSource(document), streamResult);
+            System.out.println("Replacing completed.");
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Exception occurred while attempting to parse 
flow.xml.gz.  Cause: " + e.getCause());
+        } finally {
+            gzipOutStream.close();
+            outputStream.close();
+            gzipStream.close();
+            fileStream.close();
+        }
+    }
+
+    private static boolean showingUsageNeeded(String[] args) {
+        return args.length < 6 || args[0].equalsIgnoreCase("-h") || 
args[0].equalsIgnoreCase("--help");
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
new file mode 100644
index 0000000000..3ffc33d0db
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.ProcessorDescriptor;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
+
+public class MigratorConfiguration {
+    final private String kafkaBrokers;
+    final private boolean transaction;
+    final private boolean isVersion8Processor;
+    final private ProcessorDescriptor processorDescriptor;
+    final private PropertyXpathDescriptor propertyXpathDescriptor;
+
+    public MigratorConfiguration(final String kafkaBrokers, final boolean 
transaction, final boolean isVersion8Processor,
+                                 final ProcessorDescriptor 
processorDescriptor, final PropertyXpathDescriptor propertyXpathDescriptor) {
+        this.kafkaBrokers = kafkaBrokers;
+        this.transaction = transaction;
+        this.isVersion8Processor = isVersion8Processor;
+        this.processorDescriptor = processorDescriptor;
+        this.propertyXpathDescriptor = propertyXpathDescriptor;
+    }
+
+    public String getKafkaBrokers() {
+        return kafkaBrokers;
+    }
+
+    public boolean isTransaction() {
+        return transaction;
+    }
+
+    public boolean isVersion8Processor() {
+        return isVersion8Processor;
+    }
+
+    public ProcessorDescriptor getProcessorDescriptor() {
+        return processorDescriptor;
+    }
+
+    public PropertyXpathDescriptor getPropertyXpathDescriptor() {
+        return propertyXpathDescriptor;
+    }
+
+    public static class MigratorConfigurationBuilder {
+        private String kafkaBrokers;
+        private boolean transaction;
+        private boolean isVersion8Processor;
+        private ProcessorDescriptor processorDescriptor;
+        private PropertyXpathDescriptor propertyXpathDescriptor;
+
+        public MigratorConfigurationBuilder setKafkaBrokers(final String 
kafkaBrokers) {
+            this.kafkaBrokers = kafkaBrokers;
+            return this;
+        }
+
+        public MigratorConfigurationBuilder setTransaction(final boolean 
transaction) {
+            this.transaction = transaction;
+            return this;
+        }
+
+        public MigratorConfigurationBuilder setIsVersion8Processor(final 
boolean isVersion8Processor) {
+            this.isVersion8Processor = isVersion8Processor;
+            return this;
+        }
+
+        public MigratorConfigurationBuilder setProcessorDescriptor(final 
ProcessorDescriptor processorDescriptor) {
+            this.processorDescriptor = processorDescriptor;
+            return this;
+        }
+
+        public MigratorConfigurationBuilder setPropertyXpathDescriptor(final 
PropertyXpathDescriptor propertyXpathDescriptor) {
+            this.propertyXpathDescriptor = propertyXpathDescriptor;
+            return this;
+        }
+
+        public MigratorConfiguration build() {
+            return new MigratorConfiguration(kafkaBrokers, transaction, 
isVersion8Processor,
+                                            processorDescriptor, 
propertyXpathDescriptor);
+        }
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
new file mode 100644
index 0000000000..ad0aa453b0
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlowPropertyXpathDescriptor implements PropertyXpathDescriptor {
+
+    private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
+    private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
+    private static final Map<KafkaProcessorType, Map<String, String>> 
TRANSACTION_PROPERTIES;
+    static {
+        CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
+        CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", 
"property[name=\"honor-transactions\"]/value");
+        CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", 
"honor-transactions");
+        PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
+        PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", 
"property[name=\"use-transactions\"]/value");
+        PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", 
"use-transactions");
+        TRANSACTION_PROPERTIES = new HashMap<>();
+        TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, 
CONSUME_TRANSACTION_PROPERTIES);
+        TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, 
PUBLISH_TRANSACTION_PROPERTIES);
+    }
+
+    private final KafkaProcessorType processorType;
+
+    public FlowPropertyXpathDescriptor(final KafkaProcessorType processorType) 
{
+        this.processorType = processorType;
+    }
+
+    @Override
+    public String getXpathForProperties() {
+        return "property";
+    }
+
+    @Override
+    public String getPropertyKeyTagName() {
+        return "name";
+    }
+
+    @Override
+    public String getPropertyTagName() {
+        return "property";
+    }
+
+    @Override
+    public String getXpathForTransactionProperty() {
+        return 
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
+    }
+
+    @Override
+    public String getTransactionTagName() {
+        return 
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
new file mode 100644
index 0000000000..11f507aaec
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaProcessorDescriptor implements ProcessorDescriptor {
+    private static final Map<String, String> 
CONSUME_KAFKA_PROCESSOR_PROPERTIES;
+    private static final Map<String, String> CONSUME_PROPERTIES_TO_BE_SAVED;
+    private static final Map<String, String> 
PUBLISH_KAFKA_PROCESSOR_PROPERTIES;
+    private static final Map<String, String> PUBLISH_PROPERTIES_TO_BE_SAVED;
+    private static final Map<String, String> CONTROLLER_SERVICES;
+    private static final Map<KafkaProcessorType, Map<String, String>> 
PROPERTIES;
+    private static final Map<KafkaProcessorType, Map<String, String>> 
PROPERTIES_TO_BE_SAVED;
+
+    static {
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", 
"PLAINTEXT");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", 
null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", 
null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", 
null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic_type", "names");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("group.id", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("auto.offset.reset", "latest");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", 
"utf-8");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("separate-by-key", "false");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", 
"UTF-8");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("header-name-regex", null);
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max.poll.records", "10000");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max-uncommit-offset-wait", "1 
secs");
+        CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("Communications Timeout", "60 
secs");
+
+        CONSUME_PROPERTIES_TO_BE_SAVED = new HashMap<>();
+        CONSUME_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
+        CONSUME_PROPERTIES_TO_BE_SAVED.put("Group ID", "group.id");
+
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol", 
"PLAINTEXT");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name", 
null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service", 
null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal", 
null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("acks", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("Failure Strategy", "Route to 
Failure");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("transactional-id-prefix", 
null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("attribute-name-regex", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding", 
"UTF-8");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kafka-key", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding", 
"utf-8");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.request.size", "1 MB");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ack.wait.time", "5 secs");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.block.ms", "5 sec");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partitioner.class", 
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partition", null);
+        PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("compression.type", null);
+
+        PUBLISH_PROPERTIES_TO_BE_SAVED = new HashMap<>();
+        PUBLISH_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
+        PUBLISH_PROPERTIES_TO_BE_SAVED.put("Partition", "partition");
+        PUBLISH_PROPERTIES_TO_BE_SAVED.put("Kafka Key", "kafka-key");
+        PUBLISH_PROPERTIES_TO_BE_SAVED.put("Delivery Guarantee", "acks");
+        PUBLISH_PROPERTIES_TO_BE_SAVED.put( "Compression Codec", 
"compression.type");
+
+        CONTROLLER_SERVICES = new HashMap<>();
+        CONTROLLER_SERVICES.put("kerberos-credentials-service", 
"org.apache.nifi.kerberos.KerberosCredentialsService");
+        CONTROLLER_SERVICES.put("ssl.context.service", 
"org.apache.nifi.ssl.SSLContextService");
+
+        PROPERTIES = new HashMap<>();
+        PROPERTIES.put(KafkaProcessorType.CONSUME, 
CONSUME_KAFKA_PROCESSOR_PROPERTIES);
+        PROPERTIES.put(KafkaProcessorType.PUBLISH, 
PUBLISH_KAFKA_PROCESSOR_PROPERTIES);
+
+        PROPERTIES_TO_BE_SAVED = new HashMap<>();
+        PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.CONSUME, 
CONSUME_PROPERTIES_TO_BE_SAVED);
+        PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.PUBLISH, 
PUBLISH_PROPERTIES_TO_BE_SAVED);
+    }
+
+    private final KafkaProcessorType processorType;
+
+    public KafkaProcessorDescriptor(final KafkaProcessorType processorType) {
+        this.processorType = processorType;
+    }
+
+    @Override
+    public Map<String, String> getProcessorProperties() {
+        return Collections.unmodifiableMap(PROPERTIES.get(processorType));
+    }
+
+    @Override
+    public Map<String, String> getPropertiesToBeSaved() {
+        return 
Collections.unmodifiableMap(PROPERTIES_TO_BE_SAVED.get(processorType));
+    }
+
+    @Override
+    public Map<String, String> getControllerServicesForTemplates() {
+        return Collections.unmodifiableMap(CONTROLLER_SERVICES);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
new file mode 100644
index 0000000000..e5f8a5dde7
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+public enum KafkaProcessorType {
+    PUBLISH("Publish"),
+    CONSUME("Consume"),
+    PUT("Put");
+
+    private final String processorType;
+
+    KafkaProcessorType(String processorType) {
+        this.processorType = processorType;
+    }
+
+    public String getProcessorType() {
+        return processorType;
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
new file mode 100644
index 0000000000..9a5f798b7d
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.Map;
+
+public interface ProcessorDescriptor {
+    Map<String, String> getProcessorProperties();
+    Map<String, String> getPropertiesToBeSaved();
+    Map<String, String> getControllerServicesForTemplates();
+
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
new file mode 100644
index 0000000000..ffb266f8c5
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+public interface PropertyXpathDescriptor {
+    String getXpathForProperties();
+    String getPropertyKeyTagName();
+    String getPropertyTagName();
+    String getXpathForTransactionProperty();
+    String getTransactionTagName();
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
new file mode 100644
index 0000000000..3276439375
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TemplatePropertyXpathDescriptor implements 
PropertyXpathDescriptor {
+
+    private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
+    private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
+    private static final Map<KafkaProcessorType, Map<String, String>> 
TRANSACTION_PROPERTIES;
+    static {
+        CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
+        CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", 
"entry[key=\"honor-transactions\"]/value");
+        CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName", 
"honor-transactions");
+        PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
+        PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty", 
"entry[key=\"use-transactions\"]/value");
+        PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName", 
"use-transactions");
+        TRANSACTION_PROPERTIES = new HashMap<>();
+        TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME, 
CONSUME_TRANSACTION_PROPERTIES);
+        TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH, 
PUBLISH_TRANSACTION_PROPERTIES);
+    }
+
+    private final KafkaProcessorType processorType;
+
+    public TemplatePropertyXpathDescriptor(final KafkaProcessorType 
processorType) {
+        this.processorType = processorType;
+    }
+
+    @Override
+    public String getXpathForProperties() {
+        return "entry";
+    }
+
+    @Override
+    public String getPropertyKeyTagName() {
+        return "key";
+    }
+
+    @Override
+    public String getPropertyTagName() {
+        return "entry";
+    }
+
+    @Override
+    public String getXpathForTransactionProperty() {
+        return 
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
+    }
+
+    @Override
+    public String getTransactionTagName() {
+        return 
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
new file mode 100644
index 0000000000..ec02bff12e
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractKafkaMigrator implements Migrator {
+    static final XPath XPATH = XPathFactory.newInstance().newXPath();
+    private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0";
+    private final static String ARTIFACT = "nifi-kafka-2-0-nar";
+    private final static String PATH_FOR_ARTIFACT = "bundle/artifact";
+
+    final boolean isVersion8Processor;
+    final boolean isKafkaBrokersPresent;
+
+    final Map<String, String> kafkaProcessorProperties;
+    final Map<String, String> propertiesToBeSaved;
+    final Map<String, String> controllerServices;
+
+    final String xpathForProperties;
+    final String propertyKeyTagName;
+    final String propertyTagName;
+
+    final String xpathForTransactionProperty;
+    final String transactionTagName;
+    final boolean transaction;
+
+
+    public AbstractKafkaMigrator(final MigratorConfiguration configuration) {
+        final String kafkaBrokers = configuration.getKafkaBrokers();
+        this.isKafkaBrokersPresent = !kafkaBrokers.isEmpty();
+        this.isVersion8Processor = configuration.isVersion8Processor();
+        this.kafkaProcessorProperties = new 
HashMap<>(configuration.getProcessorDescriptor().getProcessorProperties());
+        this.propertiesToBeSaved = 
configuration.getProcessorDescriptor().getPropertiesToBeSaved();
+        this.controllerServices = 
configuration.getProcessorDescriptor().getControllerServicesForTemplates();
+        this.xpathForProperties = 
configuration.getPropertyXpathDescriptor().getXpathForProperties();
+        this.propertyKeyTagName = 
configuration.getPropertyXpathDescriptor().getPropertyKeyTagName();
+        this.propertyTagName = 
configuration.getPropertyXpathDescriptor().getPropertyTagName();
+        this.xpathForTransactionProperty = 
configuration.getPropertyXpathDescriptor().getXpathForTransactionProperty();
+        this.transactionTagName = 
configuration.getPropertyXpathDescriptor().getTransactionTagName();
+        this.transaction = configuration.isTransaction();
+
+        if (isKafkaBrokersPresent) {
+            kafkaProcessorProperties.put("bootstrap.servers", kafkaBrokers);
+        }
+    }
+
+    @Override
+    public void configureProperties(final Node node) throws 
XPathExpressionException {
+        if (isVersion8Processor && isKafkaBrokersPresent) {
+            final NodeList properties = (NodeList) 
XPATH.evaluate(xpathForProperties, node, XPathConstants.NODESET);
+            for (int i = 0; i < properties.getLength(); i++) {
+                final Node property = properties.item(i);
+                saveRequiredProperties(property);
+                removeElement(node, property);
+            }
+            addNewProperties(node);
+        }
+    }
+
+    @Override
+    public void configureDescriptors(final Node node) throws 
XPathExpressionException {
+        if(isVersion8Processor && isKafkaBrokersPresent) {
+            final Element descriptorElement = (Element) 
XPATH.evaluate("config/descriptors", node, XPathConstants.NODE);
+            final NodeList descriptors = (NodeList) XPATH.evaluate("entry", 
descriptorElement, XPathConstants.NODESET);
+            for (int i = 0; i < descriptors.getLength(); i++) {
+                final Node descriptor = descriptors.item(i);
+                removeElement(descriptorElement, descriptor);
+            }
+            addNewDescriptors(descriptorElement);
+        }
+    }
+
+    @Override
+    public void configureComponentSpecificSteps(final Node node) throws 
XPathExpressionException {
+        final String transactionString = Boolean.toString(transaction);
+        final Element transactionsElement = (Element) 
XPATH.evaluate(xpathForTransactionProperty, node, XPathConstants.NODE);
+
+        if (transactionsElement != null) {
+            transactionsElement.setTextContent(transactionString);
+        } else {
+            addNewProperty(node, transactionTagName, transactionString);
+        }
+
+        kafkaProcessorProperties.put(transactionTagName, transactionString);
+    }
+
+    public void replaceClassName(final Element className) {
+        final String processorName = 
StringUtils.substringAfterLast(className.getTextContent(), ".");
+        final String newClassName = 
replaceClassNameWithNewProcessorName(className.getTextContent(), processorName);
+        className.setTextContent(newClassName);
+    }
+
+    public void replaceArtifact(final Node processor) throws 
XPathExpressionException {
+        ((Element) XPATH.evaluate(PATH_FOR_ARTIFACT, processor, 
XPathConstants.NODE)).setTextContent(ARTIFACT);
+    }
+
+    private static String replaceClassNameWithNewProcessorName(final String 
className, final String processorName) {
+        final String newProcessorName = StringUtils.replaceEach(processorName, 
new String[]{"Get", "Put"}, new String[]{"pubsub.Consume", "pubsub.Publish"});
+        final String processorNameWithNewVersion =
+                newProcessorName.replaceFirst("$|_0_1\\d?", 
NEW_KAFKA_PROCESSOR_VERSION);
+        return StringUtils.replace(className, processorName, 
processorNameWithNewVersion);
+    }
+
+    private void addNewDescriptors(final Node node) {
+        for (String key: kafkaProcessorProperties.keySet()) {
+            final Element descriptorElement = 
node.getOwnerDocument().createElement("entry");
+            node.appendChild(descriptorElement);
+
+            final Element descriptorKeyElement = 
descriptorElement.getOwnerDocument().createElement("key");
+            descriptorKeyElement.setTextContent(key);
+            descriptorElement.appendChild(descriptorKeyElement);
+
+            final Element descriptorValueElement = 
descriptorElement.getOwnerDocument().createElement("value");
+            descriptorElement.appendChild(descriptorValueElement);
+
+            final Element descriptorNameElement = 
descriptorValueElement.getOwnerDocument().createElement("name");
+            descriptorNameElement.setTextContent(key);
+            descriptorValueElement.appendChild(descriptorNameElement);
+
+            if (controllerServices.containsKey(key)) {
+                final Element controllerServiceElement = 
descriptorValueElement.getOwnerDocument().createElement("identifiesControllerService");
+                
controllerServiceElement.setTextContent(controllerServices.get(key));
+                descriptorValueElement.appendChild(controllerServiceElement);
+            }
+        }
+    }
+
+    private void saveRequiredProperties(final Node property) throws 
XPathExpressionException {
+        final String propertyToBeSaved = 
propertiesToBeSaved.get(XPATH.evaluate(propertyKeyTagName, property));
+
+        if (propertyToBeSaved != null) {
+            String propertyValue = XPATH.evaluate("value", property);
+            kafkaProcessorProperties.put(propertyToBeSaved, 
convert(propertyValue));
+        }
+    }
+
+    private String convert(final String propertyValue) {
+        return propertyValue.isEmpty() ? null : propertyValue;
+    }
+
+    private void addNewProperties(final Node node) {
+        for (Map.Entry<String, String> entry : 
kafkaProcessorProperties.entrySet()) {
+            addNewProperty(node, entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void addNewProperty(final Node node, final String key, final 
String value) {
+        final Element propertyElement = 
node.getOwnerDocument().createElement(propertyTagName);
+        node.appendChild(propertyElement);
+
+        final Element propertyKeyElement = 
propertyElement.getOwnerDocument().createElement(propertyKeyTagName);
+        propertyKeyElement.setTextContent(key);
+
+        propertyElement.appendChild(propertyKeyElement);
+
+        if (value != null) {
+            final Element propertyValueElement = 
propertyElement.getOwnerDocument().createElement("value");
+            propertyValueElement.setTextContent(value);
+
+            propertyElement.appendChild(propertyValueElement);
+        }
+    }
+
+    private void removeElement(final Node node, final Node element) {
+        node.removeChild(element);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
new file mode 100644
index 0000000000..022c2db933
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+
+public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
+
+    public ConsumeKafkaFlowMigrator(final MigratorConfiguration configuration) 
{
+        super(configuration);
+    }
+
+    @Override
+    public void migrate(final Element className, final Node processor) throws 
XPathExpressionException {
+        configureProperties(processor);
+        configureComponentSpecificSteps(processor);
+        replaceClassName(className);
+        replaceArtifact(processor);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
new file mode 100644
index 0000000000..3abb118a3c
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class ConsumeKafkaTemplateMigrator extends AbstractKafkaMigrator {
+
+    public ConsumeKafkaTemplateMigrator(final MigratorConfiguration 
configuration) {
+        super(configuration);
+    }
+
+    @Override
+    public void configureProperties(final Node node) throws 
XPathExpressionException {
+        final Element propertyElement = (Element) 
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+        super.configureProperties(propertyElement);
+    }
+
+    @Override
+    public void configureComponentSpecificSteps(final Node node) throws 
XPathExpressionException {
+        final Element propertyElement = (Element) 
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+        super.configureComponentSpecificSteps(propertyElement);
+    }
+
+    @Override
+    public void migrate(final Element className, final Node processor) throws 
XPathExpressionException {
+        configureProperties(processor);
+        configureComponentSpecificSteps(processor);
+        configureDescriptors(processor);
+        replaceClassName(className);
+        replaceArtifact(processor);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
new file mode 100644
index 0000000000..cf41451099
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+
+public interface Migrator {
+    void configureProperties(final Node node) throws XPathExpressionException;
+    void configureDescriptors(final Node node) throws XPathExpressionException;
+    void configureComponentSpecificSteps(final Node node) throws 
XPathExpressionException;
+    void migrate(final Element className, final Node node) throws 
XPathExpressionException;
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
new file mode 100644
index 0000000000..7ef865d25c
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class PublishKafkaFlowMigrator extends AbstractKafkaMigrator {
+
+    public PublishKafkaFlowMigrator(final MigratorConfiguration configuration) 
{
+        super(configuration);
+    }
+
+    @Override
+    public void configureComponentSpecificSteps(final Node node) throws 
XPathExpressionException {
+        final Element deliveryGuaranteeValue = (Element) 
XPATH.evaluate("property[name=\"acks\"]/value", node, XPathConstants.NODE);
+        if (this.transaction && deliveryGuaranteeValue != null) {
+            deliveryGuaranteeValue.setTextContent("all");
+        }
+        super.configureComponentSpecificSteps(node);
+    }
+
+    @Override
+    public void migrate(final Element className, final Node processor) throws 
XPathExpressionException {
+        configureProperties(processor);
+        configureComponentSpecificSteps(processor);
+        replaceClassName(className);
+        replaceArtifact(processor);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
new file mode 100644
index 0000000000..7b0538462a
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class PublishKafkaTemplateMigrator extends AbstractKafkaMigrator {
+
+    public PublishKafkaTemplateMigrator(final MigratorConfiguration 
configuration) {
+        super(configuration);
+    }
+
+    @Override
+    public void configureProperties(final Node node) throws 
XPathExpressionException {
+        final Element propertyElement = (Element) 
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+        super.configureProperties(propertyElement);
+    }
+
+    @Override
+    public void configureComponentSpecificSteps(final Node node) throws 
XPathExpressionException {
+        //add value if null
+        final Element propertyElement = (Element) 
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+        final Element deliveryGuaranteeValue = (Element) 
XPATH.evaluate("entry[key=\"acks\"]/value", propertyElement, 
XPathConstants.NODE);
+        if (this.transaction && deliveryGuaranteeValue != null) {
+            deliveryGuaranteeValue.setTextContent("all");
+        }
+        super.configureComponentSpecificSteps(propertyElement);
+    }
+
+    @Override
+    public void migrate(final Element className, final Node processor) throws 
XPathExpressionException {
+        configureProperties(processor);
+        configureComponentSpecificSteps(processor);
+        configureDescriptors(processor);
+        replaceClassName(className);
+        replaceArtifact(processor);
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
new file mode 100644
index 0000000000..f2cb98d19b
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
+
+
+public class KafkaFlowMigrationService implements KafkaMigrationService {
+    private static final String XPATH_FOR_PROCESSORS_IN_FLOW = ".//processor";
+    private static final String CLASS_TAG_NAME = "class";
+
+    public KafkaFlowMigrationService() {
+    }
+
+    @Override
+    public String getPathForProcessors() {
+        return XPATH_FOR_PROCESSORS_IN_FLOW;
+    }
+
+    @Override
+    public String getPathForClass() {
+        return CLASS_TAG_NAME;
+    }
+
+    @Override
+    public Migrator createPublishMigrator(final MigratorConfigurationBuilder 
configurationBuilder) {
+        
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+                            .setPropertyXpathDescriptor(new 
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+        return new PublishKafkaFlowMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createConsumeMigrator(final MigratorConfigurationBuilder 
configurationBuilder) {
+        
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+                            .setPropertyXpathDescriptor(new 
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+        return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createVersionEightPublishMigrator(final 
MigratorConfigurationBuilder configurationBuilder) {
+        configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+                            .setPropertyXpathDescriptor(new 
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+        return new PublishKafkaFlowMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createVersionEightConsumeMigrator(final 
MigratorConfigurationBuilder configurationBuilder) {
+        configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+                            .setPropertyXpathDescriptor(new 
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+        return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
new file mode 100644
index 0000000000..1428002077
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+
+public interface KafkaMigrationService {
+
+     String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES = 
"(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?";
+     boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
+     boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE;
+
+    String getPathForProcessors();
+    String getPathForClass();
+    Migrator createPublishMigrator(final MigratorConfigurationBuilder 
configurationBuilder);
+    Migrator createConsumeMigrator(final MigratorConfigurationBuilder 
configurationBuilder);
+    Migrator createVersionEightPublishMigrator(final 
MigratorConfigurationBuilder configurationBuilder);
+    Migrator createVersionEightConsumeMigrator(final 
MigratorConfigurationBuilder configurationBuilder);
+
+    default void replaceKafkaProcessors(final Document document, final 
MigratorConfigurationBuilder configurationBuilder) throws 
XPathExpressionException {
+        Migrator migrator;
+        final XPath xPath = XPathFactory.newInstance().newXPath();
+
+        final NodeList processors = (NodeList) 
xPath.evaluate(getPathForProcessors(), document, XPathConstants.NODESET);
+        for (int i = 0; i < processors.getLength(); i++) {
+            final Node processor = processors.item(i);
+            final Element className = ((Element) 
xPath.evaluate(getPathForClass(), processor, XPathConstants.NODE));
+            final String processorName = 
StringUtils.substringAfterLast(className.getTextContent(), ".");
+
+            if (processorName.matches(REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES)) {
+                if 
(processorName.contains(KafkaProcessorType.PUBLISH.getProcessorType())) {
+                    migrator = createPublishMigrator(configurationBuilder);
+                } else if 
(processorName.contains(KafkaProcessorType.PUT.getProcessorType())) {
+                    migrator = 
createVersionEightPublishMigrator(configurationBuilder);
+                } else if 
(processorName.contains(KafkaProcessorType.CONSUME.getProcessorType())) {
+                    migrator = createConsumeMigrator(configurationBuilder);
+                } else {
+                    migrator = 
createVersionEightConsumeMigrator(configurationBuilder);
+                }
+
+                migrator.migrate(className, processor);
+            }
+        }
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
new file mode 100644
index 0000000000..fa4b06d00a
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import 
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
+
+public class KafkaTemplateMigrationService implements KafkaMigrationService {
+    private static final String XPATH_FOR_PROCESSORS_IN_TEMPLATE = 
".//processors";
+    private static final String TYPE_TAG_NAME = "type";
+
+    public KafkaTemplateMigrationService() {
+    }
+
+    @Override
+    public String getPathForProcessors() {
+        return XPATH_FOR_PROCESSORS_IN_TEMPLATE;
+    }
+
+    @Override
+    public String getPathForClass() {
+        return TYPE_TAG_NAME;
+    }
+
+    @Override
+    public Migrator createPublishMigrator(final MigratorConfigurationBuilder 
configurationBuilder) {
+        
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+                            .setPropertyXpathDescriptor(new 
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+        return new PublishKafkaTemplateMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createConsumeMigrator(final MigratorConfigurationBuilder 
configurationBuilder) {
+        
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+                            .setPropertyXpathDescriptor(new 
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+        return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createVersionEightPublishMigrator(final 
MigratorConfigurationBuilder configurationBuilder) {
+        configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+                            .setPropertyXpathDescriptor(new 
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+        return new PublishKafkaTemplateMigrator(configurationBuilder.build());
+    }
+
+    @Override
+    public Migrator createVersionEightConsumeMigrator(final 
MigratorConfigurationBuilder configurationBuilder) {
+        configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+                            .setProcessorDescriptor(new 
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+                            .setPropertyXpathDescriptor(new 
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+        return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
new file mode 100644
index 0000000000..5c3a54fb9c
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
+import 
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class KafkaMigrationServiceTest {
+
+    private static final List<String> EXPECTED_CLASS_OR_TYPE_NAMES =
+        
Arrays.asList("org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0",
+                
"org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0",
+                "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0");
+
+    private static final List<String> EXPECTED_ARTIFACTS =
+            Arrays.asList("nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar", 
"nifi-kafka-2-0-nar");
+
+    private static final MigratorConfigurationBuilder CONFIGURATION_BUILDER =
+            new MigratorConfigurationBuilder().setKafkaBrokers("kafkaBrokers, 
localhost:1234")
+                                            .setTransaction(Boolean.FALSE);
+    private static final XPath XPATH = XPathFactory.newInstance().newXPath();
+    private static final String PATH_FOR_PROCESSORS_IN_FLOWS = ".//processor";
+    private static final String PATH_FOR_PROCESSORS_IN_TEMPLATES = 
".//processors";
+    private static final String PATH_FOR_CLASS_ELEMENT = "class";
+    private static final String PATH_FOR_TYPE_ELEMENT = "type";
+    private static final String PATH_FOR_ARTIFACT_ELEMENT = "bundle/artifact";
+
+
+    @Test
+    public void testClassReplacement() throws XPathExpressionException, 
IOException {
+        final KafkaFlowMigrationService kafkaMigrationService = new 
KafkaFlowMigrationService();
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final List<String> originalClassNames = 
createClassResultList(document);
+
+        kafkaMigrationService.replaceKafkaProcessors(document, 
CONFIGURATION_BUILDER);
+        final List<String> actualClassNames = createClassResultList(document);
+
+        assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualClassNames, 
originalClassNames);
+    }
+
+    @Test
+    public void testTypeReplacement() throws XPathExpressionException, 
IOException {
+        final KafkaTemplateMigrationService kafkaMigrationService = new 
KafkaTemplateMigrationService();
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final List<String> originalTypeNames = createTypeResultList(document);
+
+        kafkaMigrationService.replaceKafkaProcessors(document, 
CONFIGURATION_BUILDER);
+        final List<String> actualTypeNames = createTypeResultList(document);
+
+        assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualTypeNames, 
originalTypeNames);
+    }
+
+    @Test
+    public void testArtifactReplacementInTemplate() throws 
XPathExpressionException, IOException {
+        final KafkaTemplateMigrationService kafkaMigrationService = new 
KafkaTemplateMigrationService();
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final List<String> originalArtifacts = 
createArtifactResultListForTemplate(document);
+
+        kafkaMigrationService.replaceKafkaProcessors(document, 
CONFIGURATION_BUILDER);
+        final List<String> actualArtifacts = 
createArtifactResultListForTemplate(document);
+
+        assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
+    }
+
+    @Test
+    public void testArtifactReplacementInFlow() throws 
XPathExpressionException, IOException {
+        final KafkaFlowMigrationService kafkaMigrationService = new 
KafkaFlowMigrationService();
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final List<String> originalArtifacts = 
createArtifactResultListForFlow(document);
+
+        kafkaMigrationService.replaceKafkaProcessors(document, 
CONFIGURATION_BUILDER);
+        final List<String> actualArtifacts = 
createArtifactResultListForFlow(document);
+
+        assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
+    }
+
+    private List<String> createClassResultList(final Document document) throws 
XPathExpressionException {
+        return createProcessorResultListForFlow(document, 
PATH_FOR_CLASS_ELEMENT);
+    }
+
+    private List<String> createArtifactResultListForFlow(final Document 
document) throws XPathExpressionException {
+        return createProcessorResultListForFlow(document, 
PATH_FOR_ARTIFACT_ELEMENT);
+    }
+
+    private List<String> createTypeResultList(final Document document) throws 
XPathExpressionException {
+        return createProcessorResultListForTemplate(document, 
PATH_FOR_TYPE_ELEMENT);
+    }
+
+    private List<String> createArtifactResultListForTemplate(final Document 
document) throws XPathExpressionException {
+        return createProcessorResultListForTemplate(document, 
PATH_FOR_ARTIFACT_ELEMENT);
+    }
+
+    private List<String> createProcessorResultListForFlow(final Document 
document, final String elementPath) throws XPathExpressionException {
+        return createProcessorResultList(document, 
PATH_FOR_PROCESSORS_IN_FLOWS, elementPath);
+    }
+
+    private List<String> createProcessorResultListForTemplate(final Document 
document, final String elementPath) throws XPathExpressionException {
+        return createProcessorResultList(document, 
PATH_FOR_PROCESSORS_IN_TEMPLATES, elementPath);
+    }
+
+    private List<String> createProcessorResultList(final Document document, 
final String processorPath, final String elementPath) throws 
XPathExpressionException {
+        final List<String> resultList = new ArrayList<>();
+        final NodeList processors = (NodeList) XPATH.evaluate(processorPath, 
document, XPathConstants.NODESET);
+        for (int i = 0; i < processors.getLength(); i++) {
+            resultList.add(XPATH.evaluate(elementPath, processors.item(i)));
+        }
+        return resultList;
+    }
+
+    private void assertSuccess(final List<String> expectedArtifacts, final 
List<String> actualArtifacts, final List<String> originalArtifacts) {
+        assertArrayEquals(expectedArtifacts.toArray(), 
actualArtifacts.toArray());
+        assertNoReplacementHappened(originalArtifacts, actualArtifacts);
+        assertReplacementHappened(originalArtifacts, actualArtifacts);
+    }
+
+    private void assertNoReplacementHappened(final List<String> 
originalArtifacts, final List<String> actualArtifacts) {
+        assertEquals(originalArtifacts.get(0), actualArtifacts.get(0));
+    }
+
+    private void assertReplacementHappened(final List<String> 
originalArtifacts, final List<String> actualArtifacts) {
+        assertNotEquals(originalArtifacts.get(1), actualArtifacts.get(1));
+        assertNotEquals(originalArtifacts.get(2), actualArtifacts.get(2));
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
new file mode 100644
index 0000000000..b095e1cbc4
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.w3c.dom.Document;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class KafkaMigrationUtil {
+    public static Document parseDocument() throws IOException {
+        final DocumentProvider documentProvider = new 
StandardDocumentProvider();
+        return 
documentProvider.parse(Files.newInputStream(Paths.get("src/test/resources/flow.xml")));
+    }
+}
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
new file mode 100644
index 0000000000..6394516ece
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
+import 
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
+import 
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaMigratorTest {
+    private static final XPath XPATH = XPathFactory.newInstance().newXPath();
+    private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW = 
".//processor[class='org.apache.nifi.processors.kafka.PutKafka']";
+    private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE = 
".//processors[type='org.apache.nifi.processors.kafka.PutKafka']";
+
+    private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW = 
".//processor[class='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
+    private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE = 
".//processors[type='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
+    private static final KafkaProcessorDescriptor 
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR = new 
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH);
+    private static final boolean WITH_TRANSACTION = Boolean.TRUE;
+    private static final boolean WITHOUT_TRANSACTION = Boolean.FALSE;
+    private static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
+    private static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR = 
Boolean.FALSE;
+    private static final String FLOW = "Flow";
+    private static final String TEMPLATE = "Template";
+
+    @Test
+    public void testPropertiesRemoved() throws XPathExpressionException, 
IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, FLOW);
+        final PublishKafkaFlowMigrator flowMigrator = new 
PublishKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureProperties(processor);
+
+        assertPropertyRemoveSuccess(processor);
+    }
+
+    @Test
+    public void testPropertiesAdded() throws XPathExpressionException, 
IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, 
IS_VERSION_EIGHT_PROCESSOR,KafkaProcessorType.PUBLISH, FLOW);
+        final PublishKafkaFlowMigrator flowMigrator = new 
PublishKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureProperties(processor);
+
+        assertPropertyAddSuccess(processor);
+    }
+
+    @Test
+    public void testPropertiesSaved() throws XPathExpressionException, 
IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, FLOW);
+        final PublishKafkaFlowMigrator flowMigrator = new 
PublishKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+        final List<String> oldValues = getOldValues(processor);
+
+        flowMigrator.configureProperties(processor);
+
+        final List<String> newValues = getNewValues(processor);
+        assertEquals(oldValues, newValues);
+    }
+
+    @Test
+    public void testDescriptorsAdded() throws XPathExpressionException, 
IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, TEMPLATE);
+        final PublishKafkaTemplateMigrator templateMigrator = new 
PublishKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureDescriptors(processor);
+
+        assertDescriptorAddSuccess(processor);
+    }
+
+    @Test
+    public void testDescriptorsRemoved() throws XPathExpressionException, 
IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, TEMPLATE);
+        final PublishKafkaTemplateMigrator templateMigrator = new 
PublishKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureDescriptors(processor);
+
+        assertDescriptorRemoveSuccess(processor);
+    }
+
+    @Test
+    public void testTransactionFlowPropertyForConsumeProcessorWithTrue() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.CONSUME, FLOW);
+        final ConsumeKafkaFlowMigrator flowMigrator = new 
ConsumeKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("true", 
XPATH.evaluate("property[name='honor-transactions']/value", processor));
+    }
+
+    @Test
+    public void testTransactionTemplatePropertyForConsumeProcessorWithTrue() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.CONSUME, TEMPLATE);
+        final ConsumeKafkaTemplateMigrator templateMigrator = new 
ConsumeKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("true", 
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", 
processor));
+    }
+
+    @Test
+    public void testTransactionFlowPropertyForConsumeProcessorWithFalse() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.CONSUME, FLOW);
+        final ConsumeKafkaFlowMigrator flowMigrator = new 
ConsumeKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("false", 
XPATH.evaluate("property[name='honor-transactions']/value", processor));
+    }
+
+    @Test
+    public void testTransactionTemplatePropertyForConsumeProcessorWithFalse() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.CONSUME, TEMPLATE);
+        final ConsumeKafkaTemplateMigrator templateMigrator = new 
ConsumeKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("false", 
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value", 
processor));
+    }
+
+    @Test
+    public void testTransactionFlowPropertyForPublishProcessorWithTrue() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, FLOW);
+        final PublishKafkaFlowMigrator flowMigrator = new 
PublishKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("true", 
XPATH.evaluate("property[name='use-transactions']/value", processor));
+        assertEquals("", XPATH.evaluate("property[name='acks']/value", 
processor));
+
+    }
+
+    @Test
+    public void testTransactionTemplatePropertyForPublishProcessorWithTrue() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, TEMPLATE);
+        final PublishKafkaTemplateMigrator templateMigrator = new 
PublishKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("true", 
XPATH.evaluate("config/properties/entry[key='use-transactions']/value", 
processor));
+        assertEquals("", 
XPATH.evaluate("config/properties/entry[key='acks']/value", processor));
+    }
+
+    @Test
+    public void testTransactionFlowPropertyForPublishProcessorWithFalse() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, FLOW);
+        final PublishKafkaFlowMigrator flowMigrator = new 
PublishKafkaFlowMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document, 
XPathConstants.NODE);
+
+        flowMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("false", 
XPATH.evaluate("property[name='use-transactions']/value", processor));
+    }
+
+    @Test
+    public void testTransactionTemplatePropertyForPublishProcessorWithFalse() 
throws XPathExpressionException, IOException {
+        final MigratorConfiguration configuration = 
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR, 
KafkaProcessorType.PUBLISH, TEMPLATE);
+        final PublishKafkaTemplateMigrator templateMigrator = new 
PublishKafkaTemplateMigrator(configuration);
+        final Document document = KafkaMigrationUtil.parseDocument();
+        final Node processor = (Node) 
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document, 
XPathConstants.NODE);
+
+        templateMigrator.configureComponentSpecificSteps(processor);
+
+        assertEquals("false", 
XPATH.evaluate("config/properties/entry[key='use-transactions']/value", 
processor));
+    }
+
+
+    private List<String> getValues(final Collection<String> properties, final 
Node node) throws XPathExpressionException {
+        final List<String> result = new ArrayList<>();
+        for (String propertyName : properties) {
+            
result.add(XPATH.evaluate(String.format("property[name='%s']/value", 
propertyName), node));
+        }
+        return result;
+    }
+
+    private List<String> getOldValues(final Node node) throws 
XPathExpressionException {
+        return 
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().keySet(), 
node);
+    }
+
+    private List<String> getNewValues(final Node node) throws 
XPathExpressionException {
+        return 
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().values(), 
node);
+    }
+
+    private void assertPropertyRemoveSuccess(final Node node) throws 
XPathExpressionException {
+        assertTrue(XPATH.evaluate("property[name='Known Brokers']", 
node).isEmpty());
+    }
+
+    private void assertDescriptorRemoveSuccess(final Node node) throws 
XPathExpressionException {
+        assertTrue(XPATH.evaluate("config/descriptors/entry[key='Known 
Brokers']", node).isEmpty());
+    }
+
+    private void assertAddSuccess(final String xpath, final Node node) throws 
XPathExpressionException {
+        for (String propertyName: 
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getProcessorProperties().keySet()) {
+            assertFalse(XPATH.evaluate(String.format(xpath, propertyName), 
node).isEmpty());
+        }
+    }
+    private void assertPropertyAddSuccess(final Node node) throws 
XPathExpressionException {
+        assertAddSuccess("property[name='%s']/name", node);
+    }
+
+    private void assertDescriptorAddSuccess(final Node node) throws 
XPathExpressionException {
+        assertAddSuccess("config/descriptors/entry[key='%s']/key", node);
+    }
+
+    private MigratorConfiguration getConfiguration(final boolean transaction, 
final boolean isVersion8Processor,
+                                                   final KafkaProcessorType 
processorType, final String migrationType) {
+        final MigratorConfigurationBuilder configurationBuilder = new 
MigratorConfigurationBuilder();
+        final PropertyXpathDescriptor propertyXpathDescriptor;
+
+        if (migrationType.equalsIgnoreCase("Flow")) {
+            propertyXpathDescriptor = new 
FlowPropertyXpathDescriptor(processorType);
+        } else {
+             propertyXpathDescriptor= new 
TemplatePropertyXpathDescriptor(processorType);
+        }
+
+        return configurationBuilder.setKafkaBrokers("kafkaBrokers, 
localhost:1234")
+                                    .setTransaction(transaction)
+                                    
.setIsVersion8Processor(isVersion8Processor)
+                                    .setProcessorDescriptor(new 
KafkaProcessorDescriptor(processorType))
+                                    
.setPropertyXpathDescriptor(propertyXpathDescriptor)
+                                    .build();
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml 
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
new file mode 100644
index 0000000000..b1cc3b5216
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
@@ -0,0 +1,136 @@
+<rootGroup>
+    <processGroup>
+        <processor>
+            
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-kafka-2-0-nar</artifact>
+                <version>1.13.2.2.1.2.0-283</version>
+            </bundle>
+            <property>
+                <name>bootstrap.servers</name>
+                <value>localhost:9092</value>
+            </property>
+        </processor>
+        <processor>
+            
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-kafka-0-10-nar</artifact>
+                <version>1.13.2.2.1.2.0-283</version>
+            </bundle>
+            <property>
+                <name>bootstrap.servers</name>
+                <value>localhost:9092</value>
+            </property>
+        </processor>
+        <processor>
+            <class>org.apache.nifi.processors.kafka.PutKafka</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-kafka-0-8-nar</artifact>
+                <version>1.13.2.2.1.2.0-283</version>
+            </bundle>
+            <property>
+                <name>Known Brokers</name>
+            </property>
+            <property>
+                <name>Topic Name</name>
+                <value>test-topic</value>
+            </property>
+            <property>
+                <name>Partition</name>
+                <value>test-partition</value>
+            </property>
+            <property>
+                <name>Kafka Key</name>
+                <value>kafka-key</value>
+            </property>
+            <property>
+                <name>Delivery Guarantee</name>
+                <value>1</value>
+            </property>
+            <property>
+                <name>Compression Codec</name>
+                <value>gzip</value>
+            </property>
+        </processor>
+    </processGroup>
+    <template encoding-version="1.3">
+        <snippet>
+            <processGroups>
+                <contents>
+                    <processors>
+                        <bundle>
+                            <artifact>nifi-kafka-2-0-nar</artifact>
+                            <group>org.apache.nifi</group>
+                            <version>1.13.2.2.1.2.0-283</version>
+                        </bundle>
+                        <config>
+                            <descriptors>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                    <value>
+                                        <name>Known Brokers</name>
+                                    </value>
+                                </entry>
+                            </descriptors>
+                            <properties>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                </entry>
+                            </properties>
+                        </config>
+                        
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</type>
+                    </processors>
+                    <processors>
+                        <bundle>
+                            <artifact>nifi-kafka-0-10-nar</artifact>
+                            <group>org.apache.nifi</group>
+                            <version>1.13.2.2.1.2.0-283</version>
+                        </bundle>
+                        <config>
+                            <descriptors>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                    <value>
+                                        <name>Known Brokers</name>
+                                    </value>
+                                </entry>
+                            </descriptors>
+                            <properties>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                </entry>
+                            </properties>
+                        </config>
+                        
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</type>
+                    </processors>
+                    <processors>
+                        <bundle>
+                            <artifact>nifi-kafka-0-8-nar</artifact>
+                            <group>org.apache.nifi</group>
+                            <version>1.13.2.2.1.2.0-283</version>
+                        </bundle>
+                        <config>
+                            <descriptors>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                    <value>
+                                        <name>Known Brokers</name>
+                                    </value>
+                                </entry>
+                            </descriptors>
+                            <properties>
+                                <entry>
+                                    <key>Known Brokers</key>
+                                </entry>
+                            </properties>
+                        </config>
+                        <type>org.apache.nifi.processors.kafka.PutKafka</type>
+                    </processors>
+                </contents>
+            </processGroups>
+        </snippet>
+    </template>
+</rootGroup>
\ No newline at end of file
diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml
index d4d8c1f63b..a65c649c04 100644
--- a/nifi-toolkit/pom.xml
+++ b/nifi-toolkit/pom.xml
@@ -33,6 +33,7 @@
         <module>nifi-toolkit-flowanalyzer</module>
         <module>nifi-toolkit-cli</module>
         <module>nifi-toolkit-api</module>
+        <module>nifi-toolkit-kafka-migrator</module>
     </modules>
     <properties>
         <toolkit.groovy.version>3.0.8</toolkit.groovy.version>

Reply via email to