Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2671#discussion_r185740310
  
    --- Diff: 
nifi-nar-bundles/nifi-marklogic-bundle/nifi-marklogic-processors/src/main/java/com/marklogic/nifi/processor/QueryMarkLogic.java
 ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package com.marklogic.nifi.processor;
    +
    +import com.marklogic.client.datamovement.ExportListener;
    +import com.marklogic.client.ext.datamovement.job.SimpleQueryBatcherJob;
    +import com.marklogic.client.io.BytesHandle;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +
    +@Tags({"MarkLogic"})
    +@CapabilityDescription("Creates FlowFiles from batches of documents, 
matching the given criteria," +
    +    " retrieved from a MarkLogic server using the MarkLogic Data Movement 
SDK (DMSDK)")
    +public class QueryMarkLogic extends AbstractMarkLogicProcessor {
    +
    +    public static final PropertyDescriptor CONSISTENT_SNAPSHOT = new 
PropertyDescriptor.Builder()
    +        .name("Consistent snapshot")
    +        .displayName("Consistent snapshot")
    +        .defaultValue("true")
    +        .description("Boolean used to indicate that the matching documents 
were retrieved from a " +
    +            "consistent snapshot")
    +        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor COLLECTIONS = new 
PropertyDescriptor.Builder()
    +        .name("Collections")
    +        .displayName("Collections")
    +        .description("Comma-separated list of collections to query from a 
MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URIS_QUERY = new 
PropertyDescriptor.Builder()
    +        .name("URIs query")
    +        .displayName("URIs query")
    +        .description("CTS URI Query for retrieving documents from a 
MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor URI_PATTERN = new 
PropertyDescriptor.Builder()
    +        .name("URI pattern")
    +        .displayName("URI pattern")
    +        .description("URI pattern for retrieving documents from a 
MarkLogic server")
    +        .addValidator(NO_VALIDATION_VALIDATOR)
    +        .build();
    +
    +    protected static final Relationship SUCCESS = new 
Relationship.Builder()
    +        .name("SUCCESS")
    +        .description("All FlowFiles that are created from documents read 
from MarkLogic are routed to" +
    +            " this success relationship")
    +        .build();
    +
    +    @Override
    +    public void init(ProcessorInitializationContext context) {
    +        super.init(context);
    +
    +        List<PropertyDescriptor> list = new ArrayList<>();
    +        list.addAll(properties);
    +        list.add(CONSISTENT_SNAPSHOT);
    +        list.add(COLLECTIONS);
    +        list.add(URIS_QUERY);
    +        list.add(URI_PATTERN);
    +        properties = Collections.unmodifiableList(list);
    +        Set<Relationship> set = new HashSet<>();
    +        set.add(SUCCESS);
    +        relationships = Collections.unmodifiableSet(set);
    +    }
    +
    +    @Override
    +    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
    +        SimpleQueryBatcherJob job = new SimpleQueryBatcherJob();
    +        job.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
    +        job.setThreadCount(context.getProperty(THREAD_COUNT).asInteger());
    +        
job.setConsistentSnapshot(context.getProperty(CONSISTENT_SNAPSHOT).asBoolean());
    +
    +        String value = context.getProperty(COLLECTIONS).getValue();
    +        if (value != null) {
    +            job.setWhereCollections(value.split(","));
    +        }
    +
    +        
job.setWhereUriPattern(context.getProperty(URI_PATTERN).getValue());
    +        job.setWhereUrisQuery(context.getProperty(URIS_QUERY).getValue());
    +
    +        // TODO Replace ExportListener with custom listener so that we can 
do a commit per batch
    --- End diff --
    
    I would strongly recommend doing this TODO. `GetMongo` follows a similar 
pattern and it really saves the user time and resources to have a batch of 
documents written instead of 1:1 with a flowfile. For the loads MarkLogic is 
known to handle, that could be a significant volume of flowfiles, especially if 
a user uses this to do the equivalent of a full table fetch.


---

Reply via email to