Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r166320282 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } - private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { - return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() + private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { + return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } - private void writeBatch(String payload, ProcessContext context, ProcessSession session) { + private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + Map<String, String> attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + if (count != null) { + attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); + attrs.put(PROGRESS_END, String.valueOf(index)); + attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); + } + flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); + if (doCommit) { + session.commit(); + } --- End diff -- Yeah, that's what I wanted to understand. Along with a flowfile per batch, it writes the progress (if enabled)? LGTM. +1
---