This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 4e304ac586 NIFI-11590: Missing Enum data type handling in FlowFileTable 4e304ac586 is described below commit 4e304ac58649ed5a713dc2ea50990e897eed981c Author: Mark Bathori <bathori.m...@gmail.com> AuthorDate: Thu May 25 14:41:42 2023 +0200 NIFI-11590: Missing Enum data type handling in FlowFileTable This closes #7294. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../processors/helloworld/HelloStateProcessor.java | 167 +++++++++++++++++++++ .../processors/helloworld/HelloWorldProcessor.java | 157 +++++++++++++++++++ .../org/apache/nifi/queryrecord/FlowFileTable.java | 2 + .../nifi/processors/standard/TestQueryRecord.java | 12 +- 4 files changed, 337 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java new file mode 100644 index 0000000000..0e058f467e --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloStateProcessor.java @@ -0,0 +1,167 @@ +package org.apache.nifi.processors.helloworld; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@Tags("state") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Stateful(scopes = Scope.CLUSTER, description = "") +public class HelloStateProcessor extends AbstractProcessor { + + private static final String COUNTER_KEY = "counter"; + private static final String TIMESTAMP_KEY = "timestamp"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .build(); + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected void init(ProcessorInitializationContext context) { + getLogger().info("init"); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + getLogger().info("customValidate"); + return Collections.emptyList(); + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + getLogger().info("onScheduled"); + if (getNodeTypeProvider().isPrimary()) { + final StateManager stateManager = context.getStateManager(); + final StateMap state = stateManager.getState(Scope.CLUSTER); + + if (!state.getStateVersion().isPresent()) { + stateManager.setState(new HashMap<>(), Scope.CLUSTER); + } + } + } + + @OnUnscheduled + public void onUnscheduled() { + getLogger().info("onUnscheduled"); + } + + @OnStopped + public void onStopped() { + getLogger().info("onStopped"); + } + + @OnShutdown + public void onShutdown() { + getLogger().info("onShutdown"); + } + +// public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +// try { +// getLogger().info("onTrigger"); +// +// FlowFile flowFile = session.get(); +// if (flowFile == null) { +// getLogger().info("Null FlowFile"); +// return; +// } +// +// StateMap oldState = session.getState(Scope.CLUSTER); +// +// Map<String, String> stateMap = new HashMap<>(oldState.toMap()); +// +// String counterStr = stateMap.get(COUNTER_KEY); +// +// int counter = counterStr != null ? Integer.parseInt(counterStr) : 0; +// counter++; +// stateMap.put(COUNTER_KEY, Integer.toString(counter)); +// +// stateMap.put(TIMESTAMP_KEY, LocalDateTime.now().toString()); +// +// boolean success = session.replaceState(oldState, stateMap, Scope.CLUSTER); // reread state +// +// if (success) { +// session.transfer(flowFile, REL_SUCCESS); +// } else { +// session.transfer(flowFile, REL_FAILURE); +// } +// } catch (Exception e) { +// getLogger().error("HelloWorldProcessor failure", e); +// throw new ProcessException("HelloWorldProcessor failure", e); +// } +// } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + try { + getLogger().info("onTrigger"); + + FlowFile flowFile = session.get(); + if (flowFile == null) { + getLogger().info("Null FlowFile"); + return; + } + + StateManager stateManager = context.getStateManager(); + + StateMap oldState = stateManager.getState(Scope.CLUSTER); + + Map<String, String> stateMap = new HashMap<>(oldState.toMap()); + + String counterStr = stateMap.get(COUNTER_KEY); + + int counter = counterStr != null ? Integer.parseInt(counterStr) : 0; + counter++; + stateMap.put(COUNTER_KEY, Integer.toString(counter)); + + boolean success = stateManager.replace(oldState, stateMap, Scope.CLUSTER); + + if (success) { + session.transfer(flowFile, REL_SUCCESS); + } else { + session.transfer(flowFile, REL_FAILURE); + + } + } catch (Exception e) { + getLogger().error("HelloWorldProcessor failure", e); + throw new ProcessException("HelloWorldProcessor failure", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java new file mode 100644 index 0000000000..c30c80e039 --- /dev/null +++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/helloworld/HelloWorldProcessor.java @@ -0,0 +1,157 @@ +package org.apache.nifi.processors.helloworld; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Tags("hello") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class HelloWorldProcessor extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .build(); + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected void init(ProcessorInitializationContext context) { + getLogger().info("init"); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + getLogger().info("customValidate"); + return Collections.emptyList(); + } + + @OnScheduled + public void onScheduled() { + getLogger().info("onScheduled"); + } + + @OnUnscheduled + public void onUnscheduled() { + getLogger().info("onUnscheduled"); + } + + @OnStopped + public void onStopped() { + getLogger().info("onStopped"); + } + + @OnShutdown + public void onShutdown() { + getLogger().info("onShutdown"); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + try { + getLogger().info("onTrigger"); + + FlowFile flowFile = session.get(); + if (flowFile == null) { + getLogger().info("Null FlowFile"); + return; + } + +// List<FlowFile> flowFiles = session.get(ff -> { +// //String uuid = ff.getAttribute(CoreAttributes.UUID.key()); +// String uuid = ff.getAttribute("my.uuid"); +// if (uuid != null && uuid.equals("just this")) { +// return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE; +// } else { +// return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE; +// } +// }); +// if (flowFiles.isEmpty()) { +// getLogger().info("Null FlowFile"); +// return; +// } +// FlowFile flowFile = flowFiles.get(0); + + String name = flowFile.getAttribute("name"); + if (name == null) { + name = "Anonymous"; + } + + if ("Exception".equals(name)) { + throw new ProcessException("exception"); + // => session.rollback(true) in AbstractProcessor (if thrown from onTrigger()) + } + + if ("Yield".equals(name)) { + context.yield(); + session.remove(flowFile); + return; + } + + if ("Penalize".equals(name)) { + session.penalize(flowFile); + session.rollback(); + return; + // => no effect, will be rolled back + // same as Rollback case below + // session.rollback(true) should be used or ProcessException should be thrown + } + + if ("Rollback".equals(name)) { + session.rollback(); + return; + // => retriggered immediately + } + + if ("Failure".equals(name)) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + String greeting = String.format("Hello %s!", name); + getLogger().info(greeting); + + try (OutputStream os = session.write(flowFile)) { + os.write(greeting.getBytes()); + } + + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error("HelloWorldProcessor failure", e); + throw new ProcessException("HelloWorldProcessor failure", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index a2debbdce5..e462972387 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -227,6 +227,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable, Tran return typeFactory.createJavaType(BigInteger.class); case DECIMAL: return typeFactory.createJavaType(BigDecimal.class); + case ENUM: + return typeFactory.createJavaType(Enum.class); case CHOICE: final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType; DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 20f6dca199..d8a9a1f069 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -251,7 +251,7 @@ public class TestQueryRecord { runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader"); runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); runner.setProperty(REL_NAME, - "SELECT title, name" + + "SELECT title, name, jobLevel" + " FROM FLOWFILE" + " WHERE CARDINALITY(addresses) > 1"); @@ -270,6 +270,7 @@ public class TestQueryRecord { final Record output = written.get(0); assertEquals("John Doe", output.getValue("name")); assertEquals("Software Engineer", output.getValue("title")); + assertEquals(JobLevel.IC2, output.getValue("jobLevel")); } @Test @@ -777,6 +778,7 @@ public class TestQueryRecord { * { * "name": "John Doe", * "title": "Software Engineer", + * "jobLevel": "IC2", * "age": 40, * "addresses": [{ * "streetNumber": 4820, @@ -815,6 +817,7 @@ public class TestQueryRecord { personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); personFields.add(new RecordField("age", RecordFieldType.INT.getDataType())); personFields.add(new RecordField("title", RecordFieldType.STRING.getDataType())); + personFields.add(new RecordField("jobLevel", RecordFieldType.ENUM.getDataType())); personFields.add(new RecordField("height", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.DOUBLE.getDataType(), RecordFieldType.INT.getDataType()))); personFields.add(new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.RECORD.getRecordDataType(addressSchema)) )); final RecordSchema personSchema = new SimpleRecordSchema(personFields); @@ -844,6 +847,7 @@ public class TestQueryRecord { map.put("age", 30); map.put("height", 60.5); map.put("title", "Software Engineer"); + map.put("jobLevel", JobLevel.IC2); map.put("addresses", new Record[] {homeAddress, workAddress}); return new MapRecord(personSchema, map); } @@ -1281,4 +1285,10 @@ public class TestQueryRecord { } + public enum JobLevel { + IC1, + IC2, + IC3 + } + }