Colin,

You're using the ConsumeKafka processors.  Given that this is avro
data for which you have a schema/etc.. I strongly recommend you use
ConsumeKafkaRecord0_10...

In that you get to specify the record reader/writer you'll need.  You
will also see dramatically higher performance.

Lets get you reliably reading records from kafka and then move on to
other details such as LookupRecord/etc..

We'll need to see the actual error information you're getting I
suspect to be of much help.

Thanks



On Wed, Mar 21, 2018 at 9:33 PM, Colin Williams
<colin.williams.seat...@gmail.com> wrote:
> Hi Joe,
>
> I don't believe the Avro schema included, and expect they are the data
> portion... I think that's why I need to use the avsc file mentioned above...
>
> On Wed, Mar 21, 2018 at 6:19 PM, Joe Witt <joe.w...@gmail.com> wrote:
>>
>> Can you share a template of your process group?
>>
>> Do the messages in Kafka have the Avro schema included in them or are
>> they just the data portion of the record?
>>
>> On Wed, Mar 21, 2018 at 9:16 PM, Colin Williams
>> <colin.williams.seat...@gmail.com> wrote:
>> > I have an avro avsc file for a table with a definition like:
>> >
>> >
>> > {"type":"record","name":"INV_ADJ","namespace":"NSP_SCH","fields":[{"name":"table","type":"string"},{"name":"op_type","type":"string"},{"name":"op_ts","type":"string"},{"name":"current_ts","type":"string"},{"name":"pos","type":"string"},{"name":"primary_keys","type":{"type":"array","items":"string"}},{"name":"tokens","type":{"type":"map","values":"string"},"default":{}},{"name":"before","type":["null",{"type":"record","name":"columns","fields":[{"name":"ITEM","type":["null","string"],"default":null},{"name":"ITEM_isMissing","type":"boolean"},{"name":"INV_STATUS","type":["null","long"],"default":null},{"name":"INV_STATUS_isMissing","type":"boolean"},{"name":"LOC_TYPE","type":["null","string"],"default":null},{"name":"LOC_TYPE_isMissing","type":"boolean"},{"name":"LOCATION","type":["null","long"],"default":null},{"name":"LOCATION_isMissing","type":"boolean"},{"name":"ADJ_QTY","type":["null","double"],"default":null},{"name":"ADJ_QTY_isMissing","type":"boolean"},{"name":"REASON","type":["null","long"],"default":null},{"name":"REASON_isMissing","type":"boolean"},{"name":"ADJ_DATE","type":["null","string"],"default":null},{"name":"ADJ_DATE_isMissing","type":"boolean"},{"name":"PREV_QTY","type":["null","double"],"default":null},{"name":"PREV_QTY_isMissing","type":"boolean"},{"name":"USER_ID","type":["null","string"],"default":null},{"name":"USER_ID_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT","type":["null","double"],"default":null},{"name":"ADJ_WEIGHT_isMissing","type":"boolean"},{"name":"ADJ_WEIGHT_UOM","type":["null","string"],"default":null},{"name":"ADJ_WEIGHT_UOM_isMissing","type":"boolean"},{"name":"CREATE_ID","type":["null","string"],"default":null},{"name":"CREATE_ID_isMissing","type":"boolean"},{"name":"CREATE_DATETIME","type":["null","string"],"default":null},{"name":"CREATE_DATETIME_isMissing","type":"boolean"}]}],"default":null},{"name":"after","type":["null","columns"],"default":null}]}
>> >
>> > I have a kafka topic which should contain avro records using the above
>> > definition.
>> >
>> > I've configured the avro registry, reader, and writer with the the above
>> > definition. When I try using my nifi workflow I get exceptions like:
>> > invalid
>> > int encoding and don't seem to process any data.
>> >
>> > What am I doing wrong?
>
>

Reply via email to