Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2671#discussion_r185745410 --- Diff: nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/PutMarkLogic.java --- @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.marklogic.nifi.processor; + +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.WriteBatcher; +import com.marklogic.client.datamovement.WriteEvent; +import com.marklogic.client.datamovement.impl.WriteEventImpl; +import com.marklogic.client.document.ServerTransform; +import com.marklogic.client.io.BytesHandle; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.Format; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +/** + * The TriggerWhenEmpty annotation is used so that this processor has a chance to flush the WriteBatcher when no + * flowfiles are ready to be received. + */ +@Tags({"MarkLogic"}) +@CapabilityDescription("Write batches of FlowFiles as documents to a MarkLogic server using the " + + "MarkLogic Data Movement SDK (DMSDK)") +@TriggerWhenEmpty +public class PutMarkLogic extends AbstractMarkLogicProcessor { + + class FlowFileInfo { + FlowFile flowFile; + ProcessSession session; + FlowFileInfo(FlowFile flowFile, ProcessSession session) { + this.flowFile = flowFile; + this.session = session; + } + } + private Map<String, FlowFileInfo> URIFlowFileMap = new HashMap<>(); + public static final PropertyDescriptor COLLECTIONS = new PropertyDescriptor.Builder() + .name("Collections") + .displayName("Collections") + .description("Comma-delimited sequence of collections to add to each document") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() + .name("Format") + .displayName("Format") + .description("Format for each document; if not specified, MarkLogic will determine the format" + + " based on the URI") + .allowableValues(Format.JSON.name(), Format.XML.name(), Format.TEXT.name(), Format.BINARY.name(), Format.UNKNOWN.name()) + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor JOB_ID = new PropertyDescriptor.Builder() + .name("Job ID") + .displayName("Job ID") + .description("ID for the WriteBatcher job") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder() + .name("Job Name") + .displayName("Job Name") + .description("Name for the WriteBatcher job") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIMETYPE = new PropertyDescriptor.Builder() + .name("MIME type") + .displayName("MIME type") + .description("MIME type for each document; if not specified, MarkLogic will determine the " + + "MIME type based on the URI") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder() + .name("Permissions") + .displayName("Permissions") + .defaultValue("rest-reader,read,rest-writer,update") + .description("Comma-delimited sequence of permissions - role1, capability1, role2, " + + "capability2 - to add to each document") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor TEMPORAL_COLLECTION = new PropertyDescriptor.Builder() + .name("Temporal collection") + .displayName("Temporal collection") + .description("The temporal collection to use for a temporal document insert") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor TRANSFORM = new PropertyDescriptor.Builder() + .name("Server transform") + .displayName("Server transform") + .description("(Optional) The name of REST server transform to apply to every document as it's" + + " written") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor URI_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("URI attribute name") + .displayName("URI attribute name") + .defaultValue("uuid") + .required(true) + .description("The name of the FlowFile attribute whose value will be used as the URI") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor URI_PREFIX = new PropertyDescriptor.Builder() + .name("URI prefix") + .displayName("URI prefix") + .description("(Optional) The prefix to prepend to each URI") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + public static final PropertyDescriptor URI_SUFFIX = new PropertyDescriptor.Builder() + .name("URI suffix") + .displayName("URI suffix") + .description("(Optional) The suffix to append to each URI") + .addValidator(NO_VALIDATION_VALIDATOR) + .build(); + + protected static final Relationship SUCCESS = new Relationship.Builder() + .name("SUCCESS") + .description("All FlowFiles that are successfully written to MarkLogic are routed to the " + + "success relationship for future processing") + .build(); + + protected static final Relationship FAILURE = new Relationship.Builder() + .name("FAILURE") + .description("All FlowFiles that failed to be written to MarkLogic are routed to the " + + "failure relationship for future processing") + .build(); + + private DataMovementManager dataMovementManager; + private WriteBatcher writeBatcher; + // If no FlowFile exists when this processor is triggered, this variable determines whether or not a call is made to + // flush the WriteBatcher + private boolean shouldFlushIfEmpty = true; + + @Override + public void init(ProcessorInitializationContext context) { + super.init(context); + + List<PropertyDescriptor> list = new ArrayList<>(); + list.addAll(properties); + list.add(COLLECTIONS); + list.add(FORMAT); + list.add(JOB_ID); + list.add(JOB_NAME); + list.add(MIMETYPE); + list.add(PERMISSIONS); + list.add(TRANSFORM); + list.add(TEMPORAL_COLLECTION); + list.add(URI_ATTRIBUTE_NAME); + list.add(URI_PREFIX); + list.add(URI_SUFFIX); + properties = Collections.unmodifiableList(list); + Set<Relationship> set = new HashSet<>(); + set.add(SUCCESS); + set.add(FAILURE); + relationships = Collections.unmodifiableSet(set); + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + dataMovementManager = getDatabaseClient(context).newDataMovementManager(); + writeBatcher = dataMovementManager.newWriteBatcher() + .withJobId(context.getProperty(JOB_ID).getValue()) + .withJobName(context.getProperty(JOB_NAME).getValue()) + .withBatchSize(context.getProperty(BATCH_SIZE).asInteger()) + .withThreadCount(context.getProperty(THREAD_COUNT).asInteger()) + .withTemporalCollection(context.getProperty(TEMPORAL_COLLECTION).getValue()); + + final String transform = context.getProperty(TRANSFORM).getValue(); + if (transform != null) { + writeBatcher.withTransform(new ServerTransform(transform)); + } + this.writeBatcher.onBatchSuccess(writeBatch -> { + for(WriteEvent writeEvent : writeBatch.getItems()) { + routeDocumentToRelationship(writeEvent, SUCCESS); + } + }).onBatchFailure((writeBatch, throwable) -> { + for(WriteEvent writeEvent : writeBatch.getItems()) { + routeDocumentToRelationship(writeEvent, FAILURE); + } + }); + dataMovementManager.startJob(writeBatcher); + } + + private void routeDocumentToRelationship(WriteEvent writeEvent, Relationship relationship) { + DocumentMetadataHandle metadata = (DocumentMetadataHandle) writeEvent.getMetadata(); + String flowFileUUID = metadata.getMetadataValues().get("flowFileUUID"); + FlowFileInfo flowFile = URIFlowFileMap.get(flowFileUUID); + if(flowFile != null) { --- End diff -- Have you tried this with a very large data set to see how it performs? Like millions of sample XML records?
---