NIFI-750 adding a way to specify attribute names when constructing the spout
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0d2842e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0d2842e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0d2842e4 Branch: refs/heads/master Commit: 0d2842e4f9df2c1734c409d80511601765d8dc1b Parents: 2084024 Author: Bryan Bende <bbe...@apache.org> Authored: Sat Jul 4 12:38:33 2015 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Sat Jul 4 12:38:33 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-external/nifi-storm-spout/pom.xml | 2 +- .../java/org/apache/nifi/storm/NiFiSpout.java | 38 ++++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2842e4/nifi/nifi-external/nifi-storm-spout/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-storm-spout/pom.xml b/nifi/nifi-external/nifi-storm-spout/pom.xml index c55c698..353dd2d 100644 --- a/nifi/nifi-external/nifi-storm-spout/pom.xml +++ b/nifi/nifi-external/nifi-storm-spout/pom.xml @@ -27,7 +27,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> - <version>0.9.4</version> + <version>0.9.5</version> <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2842e4/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java index 5063123..64dac6f 100644 --- a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java +++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java @@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout { public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class); + public static final String NIFI_DATA_PACKET = "nifiDataPacket"; + private NiFiSpoutReceiver spoutReceiver; private LinkedBlockingQueue<NiFiDataPacket> queue; private SpoutOutputCollector spoutOutputCollector; private final SiteToSiteClientConfig clientConfig; + private final List<String> attributeNames; + /** + * @param clientConfig + * configuration used to build the SiteToSiteClient + */ public NiFiSpout(SiteToSiteClientConfig clientConfig) { + this(clientConfig, null); + } + + /** + * + * @param clientConfig + * configuration used to build the SiteToSiteClient + * @param attributeNames + * names of FlowFile attributes to be added as values to each tuple, in addition + * to the nifiDataPacket value on all tuples + * + */ + public NiFiSpout(SiteToSiteClientConfig clientConfig, List<String> attributeNames) { this.clientConfig = clientConfig; + this.attributeNames = (attributeNames == null ? new ArrayList<String>() : attributeNames); } @Override @@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout { if (data == null) { Utils.sleep(50); } else { - spoutOutputCollector.emit(new Values(data)); + // always start with the data packet + Values values = new Values(data); + + // add additional values based on the specified attribute names + for (String attributeName : attributeNames) { + if (data.getAttributes().containsKey(attributeName)) { + values.add(data.getAttributes().get(attributeName)); + } + } + + spoutOutputCollector.emit(values); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("nifiDataPacket")); + final List<String> fieldNames = new ArrayList<>(); + fieldNames.add(NIFI_DATA_PACKET); + fieldNames.addAll(attributeNames); + outputFieldsDeclarer.declare(new Fields(fieldNames)); } @Override