This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e304ac586 NIFI-11590: Missing Enum data type handling in FlowFileTable
4e304ac586 is described below

commit 4e304ac58649ed5a713dc2ea50990e897eed981c
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Thu May 25 14:41:42 2023 +0200

    NIFI-11590: Missing Enum data type handling in FlowFileTable
    
    This closes #7294.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../processors/helloworld/HelloStateProcessor.java | 167 +++++++++++++++++++++
 .../processors/helloworld/HelloWorldProcessor.java | 157 +++++++++++++++++++
 .../org/apache/nifi/queryrecord/FlowFileTable.java |   2 +
 .../nifi/processors/standard/TestQueryRecord.java  |  12 +-
 4 files changed, 337 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java
new file mode 100644
index 0000000000..0e058f467e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java
@@ -0,0 +1,167 @@
+package org.apache.nifi.processors.helloworld;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Tags("state")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER, description = "")
+public class HelloStateProcessor extends AbstractProcessor {
+
+    private static final String COUNTER_KEY = "counter";
+    private static final String TIMESTAMP_KEY = "timestamp";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .build();
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        getLogger().info("init");
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        getLogger().info("customValidate");
+        return Collections.emptyList();
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        getLogger().info("onScheduled");
+        if (getNodeTypeProvider().isPrimary()) {
+            final StateManager stateManager = context.getStateManager();
+            final StateMap state = stateManager.getState(Scope.CLUSTER);
+
+            if (!state.getStateVersion().isPresent()) {
+                stateManager.setState(new HashMap<>(), Scope.CLUSTER);
+            }
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        getLogger().info("onUnscheduled");
+    }
+
+    @OnStopped
+    public void onStopped() {
+        getLogger().info("onStopped");
+    }
+
+    @OnShutdown
+    public void onShutdown() {
+        getLogger().info("onShutdown");
+    }
+
+//    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+//        try {
+//            getLogger().info("onTrigger");
+//
+//            FlowFile flowFile = session.get();
+//            if (flowFile == null) {
+//                getLogger().info("Null FlowFile");
+//                return;
+//            }
+//
+//            StateMap oldState = session.getState(Scope.CLUSTER);
+//
+//            Map<String, String> stateMap = new HashMap<>(oldState.toMap());
+//
+//            String counterStr = stateMap.get(COUNTER_KEY);
+//
+//            int counter = counterStr != null ? Integer.parseInt(counterStr) 
: 0;
+//            counter++;
+//            stateMap.put(COUNTER_KEY, Integer.toString(counter));
+//
+//            stateMap.put(TIMESTAMP_KEY, LocalDateTime.now().toString());
+//
+//            boolean success = session.replaceState(oldState, stateMap, 
Scope.CLUSTER); // reread state
+//
+//            if (success) {
+//                session.transfer(flowFile, REL_SUCCESS);
+//            } else {
+//                session.transfer(flowFile, REL_FAILURE);
+//            }
+//        } catch (Exception e) {
+//            getLogger().error("HelloWorldProcessor failure", e);
+//            throw new ProcessException("HelloWorldProcessor failure", e);
+//        }
+//    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        try {
+            getLogger().info("onTrigger");
+
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+                getLogger().info("Null FlowFile");
+                return;
+            }
+
+            StateManager stateManager = context.getStateManager();
+
+            StateMap oldState = stateManager.getState(Scope.CLUSTER);
+
+            Map<String, String> stateMap = new HashMap<>(oldState.toMap());
+
+            String counterStr = stateMap.get(COUNTER_KEY);
+
+            int counter = counterStr != null ? Integer.parseInt(counterStr) : 
0;
+            counter++;
+            stateMap.put(COUNTER_KEY, Integer.toString(counter));
+
+            boolean success = stateManager.replace(oldState, stateMap, 
Scope.CLUSTER);
+
+            if (success) {
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                session.transfer(flowFile, REL_FAILURE);
+
+            }
+        } catch (Exception e) {
+            getLogger().error("HelloWorldProcessor failure", e);
+            throw new ProcessException("HelloWorldProcessor failure", e);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java
new file mode 100644
index 0000000000..c30c80e039
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java
@@ -0,0 +1,157 @@
+package org.apache.nifi.processors.helloworld;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags("hello")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class HelloWorldProcessor extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .build();
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        getLogger().info("init");
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        getLogger().info("customValidate");
+        return Collections.emptyList();
+    }
+
+    @OnScheduled
+    public void onScheduled() {
+        getLogger().info("onScheduled");
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        getLogger().info("onUnscheduled");
+    }
+
+    @OnStopped
+    public void onStopped() {
+        getLogger().info("onStopped");
+    }
+
+    @OnShutdown
+    public void onShutdown() {
+        getLogger().info("onShutdown");
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        try {
+            getLogger().info("onTrigger");
+
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+                getLogger().info("Null FlowFile");
+                return;
+            }
+
+//            List<FlowFile> flowFiles = session.get(ff -> {
+//                //String uuid = ff.getAttribute(CoreAttributes.UUID.key());
+//                String uuid = ff.getAttribute("my.uuid");
+//                if (uuid != null && uuid.equals("just this")) {
+//                    return 
FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+//                } else {
+//                    return 
FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
+//                }
+//            });
+//            if (flowFiles.isEmpty()) {
+//                getLogger().info("Null FlowFile");
+//                return;
+//            }
+//            FlowFile flowFile = flowFiles.get(0);
+
+            String name = flowFile.getAttribute("name");
+            if (name == null) {
+                name = "Anonymous";
+            }
+
+            if ("Exception".equals(name)) {
+                throw new ProcessException("exception");
+                // => session.rollback(true) in AbstractProcessor (if thrown 
from onTrigger())
+            }
+
+            if ("Yield".equals(name)) {
+                context.yield();
+                session.remove(flowFile);
+                return;
+            }
+
+            if ("Penalize".equals(name)) {
+                session.penalize(flowFile);
+                session.rollback();
+                return;
+                // => no effect, will be rolled back
+                // same as Rollback case below
+                // session.rollback(true) should be used or ProcessException 
should be thrown
+            }
+
+            if ("Rollback".equals(name)) {
+                session.rollback();
+                return;
+                // => retriggered immediately
+            }
+
+            if ("Failure".equals(name)) {
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+
+            String greeting = String.format("Hello %s!", name);
+            getLogger().info(greeting);
+
+            try (OutputStream os = session.write(flowFile)) {
+                os.write(greeting.getBytes());
+            }
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            getLogger().error("HelloWorldProcessor failure", e);
+            throw new ProcessException("HelloWorldProcessor failure", e);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index a2debbdce5..e462972387 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -227,6 +227,8 @@ public class FlowFileTable extends AbstractTable implements 
QueryableTable, Tran
                 return typeFactory.createJavaType(BigInteger.class);
             case DECIMAL:
                 return typeFactory.createJavaType(BigDecimal.class);
+            case ENUM:
+                return typeFactory.createJavaType(Enum.class);
             case CHOICE:
                 final ChoiceDataType choiceDataType = (ChoiceDataType) 
fieldType;
                 DataType widestDataType = 
choiceDataType.getPossibleSubTypes().get(0);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 20f6dca199..d8a9a1f069 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -251,7 +251,7 @@ public class TestQueryRecord {
         runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
         runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
         runner.setProperty(REL_NAME,
-                "SELECT title, name" +
+                "SELECT title, name, jobLevel" +
                         "    FROM FLOWFILE" +
                         "    WHERE CARDINALITY(addresses) > 1");
 
@@ -270,6 +270,7 @@ public class TestQueryRecord {
         final Record output = written.get(0);
         assertEquals("John Doe", output.getValue("name"));
         assertEquals("Software Engineer", output.getValue("title"));
+        assertEquals(JobLevel.IC2, output.getValue("jobLevel"));
     }
 
     @Test
@@ -777,6 +778,7 @@ public class TestQueryRecord {
      *          {
      *               "name": "John Doe",
      *               "title": "Software Engineer",
+     *               "jobLevel": "IC2",
      *               "age": 40,
      *               "addresses": [{
      *                   "streetNumber": 4820,
@@ -815,6 +817,7 @@ public class TestQueryRecord {
         personFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
         personFields.add(new RecordField("age", 
RecordFieldType.INT.getDataType()));
         personFields.add(new RecordField("title", 
RecordFieldType.STRING.getDataType()));
+        personFields.add(new RecordField("jobLevel", 
RecordFieldType.ENUM.getDataType()));
         personFields.add(new RecordField("height", 
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), 
RecordFieldType.INT.getDataType())));
         personFields.add(new RecordField("addresses", 
RecordFieldType.ARRAY.getArrayDataType( 
RecordFieldType.RECORD.getRecordDataType(addressSchema)) ));
         final RecordSchema personSchema = new SimpleRecordSchema(personFields);
@@ -844,6 +847,7 @@ public class TestQueryRecord {
         map.put("age", 30);
         map.put("height", 60.5);
         map.put("title", "Software Engineer");
+        map.put("jobLevel", JobLevel.IC2);
         map.put("addresses", new Record[] {homeAddress, workAddress});
         return new MapRecord(personSchema, map);
     }
@@ -1281,4 +1285,10 @@ public class TestQueryRecord {
 
     }
 
+    public enum JobLevel {
+        IC1,
+        IC2,
+        IC3
+    }
+
 }

Reply via email to