Dave,

Your snippet is looking good on the inside, but as you want a
ScriptedRecordSetWriter you will want to create that instead of a
Processor, something like this:

class GroovyRecordSetWriter implements RecordSetWriter {
    private int recordCount = 0
    private final OutputStream out
    private final DistributedMapCacheClient mapCacheClient

    public GroovyRecordSetWriter(final OutputStream out, final
DistributedMapCacheClient mapCacheClient) {
        this.out = out
        this.mapCacheClient = mapCacheClient
    }

    @Override
    WriteResult write(Record r) throws IOException {
        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            // TODO - stuff with records and the map cache
        }

        recordCount++;
        WriteResult.of(1, [:])
    }

    @Override
    WriteResult write(final RecordSet rs) throws IOException {
        int count = 0

        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
            // TODO - stuff with records and the map cache
        }
        WriteResult.of(count, [:])
    }

    public void beginRecordSet() throws IOException {
    }

    @Override
    public WriteResult finishRecordSet() throws IOException {
        return WriteResult.of(recordCount, [:]);
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public void flush() throws IOException {
    }
}

class GroovyRecordSetWriterFactory extends AbstractControllerService
implements RecordSetWriterFactory {

    def mapCacheClient

    // Properties
    static final PropertyDescriptor CACHE_CLIENT = new
PropertyDescriptor.Builder()
            .name("record-reader")
            .displayName("Cache Client")
            .description("Specifies the Controller Service to use for
reading incoming data")
            .identifiesControllerService(DistributedMapCacheClient.class)
            .required(true)
            .build()


    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        def properties = [] as ArrayList
        properties.add(CACHE_CLIENT)
        properties
    }

    @Override
    def init(context) {
          mapCacheClient =
context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
    }

    @Override
    RecordSchema getSchema(Map<String, String> variables, RecordSchema
readSchema) throws SchemaNotFoundException, IOException {
        return null
    }

    @Override
    RecordSetWriter createWriter(ComponentLog logger, RecordSchema
schema, OutputStream out, Map<String, String> variables) throws
SchemaNotFoundException, IOException {
        return new GroovyRecordSetWriter(out, mapCacheClient)
    }

}

writer = new GroovyRecordSetWriterFactory()

My code is sloppy as it's a combo of copy-paste jobs, but I hope it
shows the basic skeleton of a RecordSetWriter, RecordSetWriterFactory,
and creating/using Properties. I'd need to clean it up and polish it
before it became a bog post :)

Also in general even though we're using a "built-in" scripted
controller service, most of the conversations around them delve deep
into code, so you may want to consider emailing the dev mailing list
rather than the users list for this type of question in the future,
you would likely get more attention from folks who spend lots of time
with the codebase.

Regards,
Matt

On Fri, Aug 21, 2020 at 3:35 PM davide <david.ea...@grokstream.com> wrote:
>
> Matt,
>
> Thanks for the reply.  And this is where I admit to being relatively new to
> Java environments (having done mostly Python and C++ if you go back far
> enough).  So I understand if you don't have the cycles to give me too much
> of a foundational lesson, but here is where I am stuck:
>
> You said that I needed to "provide the properties [my]self", but I am not
> sure what you mean by that.
>
> With ExecuteScript, you do enter the properties in the processor config and
> reference them directly.  I understand the basic relationship between a
> PropertyValue item and the asControllerService() method because that is
> exactly what you use in ExecuteScript, you just don't have to do anything
> special to "get" the property, it is just there.
>
> I looked over the code, and I created a snippet below (based on your code
> and with in between bits removed).  Do I need to add the UUID for the
> controller somewhere directly in here?  I guess I am not 100% clear what I
> am referencing and where to put my value in...
>
> --------------------
>
> class MyRecordProcessor extends AbstractProcessor {
>
>     // Properties
>     static final PropertyDescriptor CACHE_CLIENT = new
> PropertyDescriptor.Builder()
>             .name("record-reader")
>             .displayName("Cachec Client")
>             .description("Specifies the Controller Service to use for
> reading incoming data")
>             .identifiesControllerService(DistributedMapCacheClient.class)
>             .required(true)
>             .build()
>
>
>     @Override
>     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
>         def properties = [] as ArrayList
>         properties.add(CACHE_CLIENT)
>         properties
>     }
>
>
>     @Override
>     void onTrigger(ProcessContext context, ProcessSession session) {
>         def flowFile = session.get()
>         if (!flowFile) return
>
>         def cachemap =
> context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Reply via email to