[ 
https://issues.apache.org/jira/browse/NIFI-4639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269748#comment-16269748
 ] 

ASF GitHub Bot commented on NIFI-4639:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2292#discussion_r153658187
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 ---
    @@ -106,13 +106,15 @@ void publish(final FlowFile flowFile, final RecordSet 
recordSet, final RecordSet
             Record record;
             int recordCount = 0;
     
    -        try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos)) {
    +        try {
                 while ((record = recordSet.next()) != null) {
                     recordCount++;
                     baos.reset();
     
    -                writer.write(record);
    -                writer.flush();
    +                try (final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos)) {
    --- End diff --
    
    I think this is the right way to go because the Avro API only writes the 
schema once per Avro writer (AFAIK). 


> PublishKafkaRecord with Avro writer: schema lost from output
> ------------------------------------------------------------
>
>                 Key: NIFI-4639
>                 URL: https://issues.apache.org/jira/browse/NIFI-4639
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.4.0
>            Reporter: Matthew Silverman
>         Attachments: Demo_Names_NiFi_bug.xml
>
>
> I have a {{PublishKafkaRecord_0_10}} configured with an 
> {{AvroRecordSetWriter}}, in turn configured to "Embed Avro Schema".  However, 
> when I consume data from the Kafka stream I recieve individual records that 
> lack a schema header.
> As a workaround, I can send the flow files through a {{SplitRecord}} 
> processor, which does embed the Avro schema into each resulting flow file.
> Comparing the code for {{SplitRecord}} and the {{PublishKafkaRecord}} 
> processors, I believe the issue is that {{PublisherLease}} wipes the output 
> stream after calling {{createWriter}}; however it is 
> {{AvroRecordSetWriter#createWriter}} that writes the Avro header to the 
> output stream.  {{SplitRecord}}, on the other hand, creates a new writer for 
> each output record.
> I've attached my flow.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to