This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new abc08eb842 NIFI-12796 - PutDatabaseRecord statement type should 
support u/c/d for Debezium This closes #8412.
abc08eb842 is described below

commit abc08eb842afe7a033a20fbeb94a08be264b0512
Author: Pierre Villard <pierre.villard...@gmail.com>
AuthorDate: Wed Feb 14 23:06:02 2024 -0500

    NIFI-12796 - PutDatabaseRecord statement type should support u/c/d for 
Debezium
    This closes #8412.
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../org/apache/nifi/processors/standard/PutDatabaseRecord.java | 10 +++++++++-
 .../apache/nifi/processors/standard/PutDatabaseRecordTest.java |  5 +++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index b12d2c869f..1f60208f40 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -183,7 +183,8 @@ public class PutDatabaseRecord extends AbstractProcessor {
     static final PropertyDescriptor STATEMENT_TYPE_RECORD_PATH = new Builder()
         .name("Statement Type Record Path")
         .displayName("Statement Type Record Path")
-        .description("Specifies a RecordPath to evaluate against each Record 
in order to determine the Statement Type. The RecordPath should equate to 
either INSERT, UPDATE, UPSERT, or DELETE.")
+        .description("Specifies a RecordPath to evaluate against each Record 
in order to determine the Statement Type. The RecordPath should equate to 
either INSERT, UPDATE, UPSERT, or DELETE. "
+                + "(Debezium style operation types are also supported: \"r\" 
and \"c\" for INSERT, \"u\" for UPDATE, and \"d\" for DELETE)")
         .required(true)
         .addValidator(new RecordPathValidator())
         .expressionLanguageSupported(NONE)
@@ -1555,6 +1556,13 @@ public class PutDatabaseRecord extends AbstractProcessor 
{
                 case DELETE_TYPE:
                 case UPSERT_TYPE:
                     return resultValue;
+                case "C":
+                case "R":
+                    return INSERT_TYPE;
+                case "U":
+                    return UPDATE_TYPE;
+                case "D":
+                    return DELETE_TYPE;
             }
 
             throw new ProcessException("Evaluated RecordPath " + 
recordPath.getPath() + " against Record to determine Statement Type but found 
invalid value: " + resultValue);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
index 107db3f688..e6483efc7f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java
@@ -1311,9 +1311,10 @@ public class PutDatabaseRecordTest {
         // CREATE, CREATE, CREATE, DELETE, UPDATE
         parser.addRecord("INSERT", new MapRecord(dataSchema, createValues(1, 
"John Doe", 55)));
         parser.addRecord("INSERT", new MapRecord(dataSchema, createValues(2, 
"Jane Doe", 44)));
-        parser.addRecord("INSERT", new MapRecord(dataSchema, createValues(3, 
"Jim Doe", 2)));
+        parser.addRecord("c", new MapRecord(dataSchema, createValues(3, "Jim 
Doe", 2)));
         parser.addRecord("DELETE", new MapRecord(dataSchema, createValues(2, 
"Jane Doe", 44)));
         parser.addRecord("UPDATE", new MapRecord(dataSchema, createValues(1, 
"John Doe", 201)));
+        parser.addRecord("u", new MapRecord(dataSchema, createValues(3, "Jim 
Doe", 20)));
 
         runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
         runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.USE_RECORD_PATH);
@@ -1337,7 +1338,7 @@ public class PutDatabaseRecordTest {
         assertTrue(rs.next());
         assertEquals(3, rs.getInt(1));
         assertEquals("Jim Doe", rs.getString(2));
-        assertEquals(2, rs.getInt(3));
+        assertEquals(20, rs.getInt(3));
         assertFalse(rs.next());
 
         stmt.close();

Reply via email to