Colin, You're using the ConsumeKafka processors. Given that this is avro data for which you have a schema/etc.. I strongly recommend you use ConsumeKafkaRecord0_10...
In that you get to specify the record reader/writer you'll need. You will also see dramatically higher performance. Lets get you reliably reading records from kafka and then move on to other details such as LookupRecord/etc.. We'll need to see the actual error information you're getting I suspect to be of much help. Thanks On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams <colin.williams.seat...@gmail.com> wrote: > Hi Joe, > > I don't believe the Avro schema included, and expect they are the data > portion... I think that's why I need to use the avsc file mentioned above... > > On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <joe.w...@gmail.com> wrote: >> >> Can you share a template of your process group? >> >> Do the messages in Kafka have the Avro schema included in them or are >> they just the data portion of the record? >> >> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams >> <colin.williams.seat...@gmail.com> wrote: >> > I have an avro avsc file for a table with a definition like: >> > >> > >> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]} >> > >> > I have a kafka topic which should contain avro records using the above >> > definition. >> > >> > I've configured the avro registry, reader, and writer with the the above >> > definition. When I try using my nifi workflow I get exceptions like: >> > invalid >> > int encoding and don't seem to process any data. >> > >> > What am I doing wrong? > >