Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2448#discussion_r166320282
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -221,22 +258,33 @@ private void configureMapper(String setting) {
             }
         }
     
    -    private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
    -        return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
    +    private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
    +        return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
                     : mapper.writer();
         }
     
    -    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
    +    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
             FlowFile flowFile = session.create();
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(OutputStream out) throws IOException {
                     out.write(payload.getBytes("UTF-8"));
                 }
             });
    -        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +        Map<String, String> attrs = new HashMap<>();
    +        attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (count != null) {
    +            attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
    +            attrs.put(PROGRESS_END, String.valueOf(index));
    +            attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
    +        }
    +        flowFile = session.putAllAttributes(flowFile, attrs);
             session.getProvenanceReporter().receive(flowFile, getURI(context));
    +
             session.transfer(flowFile, REL_SUCCESS);
    +        if (doCommit) {
    +            session.commit();
    +        }
    --- End diff --
    
    Yeah, that's what I wanted to understand. Along with a flowfile per batch, 
it writes the progress (if enabled)? LGTM. +1


---

Reply via email to