Thanks for the update. Is there any discussion around decoupling incoming message delimiters from outgoing batching? I believe this is how it worked up to 0.4.x. Should I make an enhancement request?
Thanks, Ralph On 4/29/16, 1:02 PM, "Joe Witt" <joe.w...@gmail.com> wrote: >Ralph, > >Possibly related to https://issues.apache.org/jira/browse/NIFI-1827. >Clearly something to get sorted out promptly. > >Thanks >Joe > >On Fri, Apr 29, 2016 at 2:45 PM, Perko, Ralph J <ralph.pe...@pnnl.gov> >wrote: >> Hi >> >> We are using Kafka as our messaging backbone and having been using Nifi >> since 0.3.x It has been a real game changer thanks for the work! >> >> In 0.6.1 When we set concurrent to anything greater than 1 and partition >> strategy to "round robin" we are getting an invalid partition error: >> >> 2016-04-29 17:44:31,536 ERROR [Timer-Driven Process Thread-6] >> o.apache.nifi.processors.kafka.PutKafka >> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] >> PutKafka[id=67a1e471-1548-407c-bedc-a2a6212c2458] failed to process >>session >> due to java.lang.IllegalArgumentException: Invalid partition given with >> record: 165 is not in the range [0...10].: >> java.lang.IllegalArgumentException: Invalid partition given with >>record: 165 >> is not in the range [0...10]. >> >> Setting the partition strategy to ³random² seems to clear up this issue. >> >> It also seems batching is not working as it did in previous versions. >> Poking around in the code and comparing the PutKafka in 0.4.1 to 0.6.1 >>it >> appears the incoming message delimiter is now tied to outgoing >>batching, but >> from what I can tell this is not the case in 0.4.1. >> >> PutKafka.java:438 >> >> if (context.getProperty(MESSAGE_DELIMITER).isSet()) { >> properties.setProperty("batch.size", >> context.getProperty(BATCH_NUM_MESSAGES).getValue()); >> } else { >> properties.setProperty("batch.size", "1"); >> } >> >> We have many single small messages coming in and do not need the message >> delimiter but still want to batch. >> >> >> It could very well be I am misunderstanding how this works. Should I be >> using it differently? Should I drop in the 0.4.1 kafka nar to get the >> batching behavior we are looking for? >> >> Thanks, >> Ralph >>