Dave, Your snippet is looking good on the inside, but as you want a ScriptedRecordSetWriter you will want to create that instead of a Processor, something like this:
class GroovyRecordSetWriter implements RecordSetWriter { private int recordCount = 0 private final OutputStream out private final DistributedMapCacheClient mapCacheClient public GroovyRecordSetWriter(final OutputStream out, final DistributedMapCacheClient mapCacheClient) { this.out = out this.mapCacheClient = mapCacheClient } @Override WriteResult write(Record r) throws IOException { new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> // TODO - stuff with records and the map cache } recordCount++; WriteResult.of(1, [:]) } @Override WriteResult write(final RecordSet rs) throws IOException { int count = 0 new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> // TODO - stuff with records and the map cache } WriteResult.of(count, [:]) } public void beginRecordSet() throws IOException { } @Override public WriteResult finishRecordSet() throws IOException { return WriteResult.of(recordCount, [:]); } @Override public void close() throws IOException { } @Override public void flush() throws IOException { } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { def mapCacheClient // Properties static final PropertyDescriptor CACHE_CLIENT = new PropertyDescriptor.Builder() .name("record-reader") .displayName("Cache Client") .description("Specifies the Controller Service to use for reading incoming data") .identifiesControllerService(DistributedMapCacheClient.class) .required(true) .build() @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { def properties = [] as ArrayList properties.add(CACHE_CLIENT) properties } @Override def init(context) { mapCacheClient = context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) } @Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out, mapCacheClient) } } writer = new GroovyRecordSetWriterFactory() My code is sloppy as it's a combo of copy-paste jobs, but I hope it shows the basic skeleton of a RecordSetWriter, RecordSetWriterFactory, and creating/using Properties. I'd need to clean it up and polish it before it became a bog post :) Also in general even though we're using a "built-in" scripted controller service, most of the conversations around them delve deep into code, so you may want to consider emailing the dev mailing list rather than the users list for this type of question in the future, you would likely get more attention from folks who spend lots of time with the codebase. Regards, Matt On Fri, Aug 21, 2020 at 3:35 PM davide <david.ea...@grokstream.com> wrote: > > Matt, > > Thanks for the reply. And this is where I admit to being relatively new to > Java environments (having done mostly Python and C++ if you go back far > enough). So I understand if you don't have the cycles to give me too much > of a foundational lesson, but here is where I am stuck: > > You said that I needed to "provide the properties [my]self", but I am not > sure what you mean by that. > > With ExecuteScript, you do enter the properties in the processor config and > reference them directly. I understand the basic relationship between a > PropertyValue item and the asControllerService() method because that is > exactly what you use in ExecuteScript, you just don't have to do anything > special to "get" the property, it is just there. > > I looked over the code, and I created a snippet below (based on your code > and with in between bits removed). Do I need to add the UUID for the > controller somewhere directly in here? I guess I am not 100% clear what I > am referencing and where to put my value in... > > -------------------- > > class MyRecordProcessor extends AbstractProcessor { > > // Properties > static final PropertyDescriptor CACHE_CLIENT = new > PropertyDescriptor.Builder() > .name("record-reader") > .displayName("Cachec Client") > .description("Specifies the Controller Service to use for > reading incoming data") > .identifiesControllerService(DistributedMapCacheClient.class) > .required(true) > .build() > > > @Override > protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { > def properties = [] as ArrayList > properties.add(CACHE_CLIENT) > properties > } > > > @Override > void onTrigger(ProcessContext context, ProcessSession session) { > def flowFile = session.get() > if (!flowFile) return > > def cachemap = > context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) > > > > -- > Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/