markap14 commented on a change in pull request #4901:
URL: https://github.com/apache/nifi/pull/4901#discussion_r726511959



##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
##########
@@ -232,64 +238,77 @@ public void onEnabled(final ConfigurationContext context) 
throws InitializationE
     public WriteResult sendData(final RecordSet recordSet, final Map<String, 
String> attributes, final boolean sendZeroResults) throws IOException {
 
         try {
-            WriteResult writeResult;
             final RecordSchema writeSchema = 
getWriterFactory().getSchema(null, recordSet.getSchema());
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             final ByteCountingOutputStream out = new 
ByteCountingOutputStream(baos);
             int recordCount = 0;
             try (final RecordSetWriter writer = 
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
-                writer.beginRecordSet();
                 Record record;
                 while ((record = recordSet.next()) != null) {
+                    baos.reset();
+                    out.reset();
                     writer.write(record);
+                    writer.flush();
                     recordCount++;
                     if (out.getBytesWritten() > maxMessageSize) {
-                        throw new TokenTooLargeException("The query's result 
set size exceeds the maximum allowed message size of " + maxMessageSize + " 
bytes.");
+                        throw new TokenTooLargeException("A record's size 
exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
                     }
+                    sendMessage(topic, baos.toByteArray());
                 }
-                writeResult = writer.finishRecordSet();
                 if (out.getBytesWritten() > maxMessageSize) {
-                    throw new TokenTooLargeException("The query's result set 
size exceeds the maximum allowed message size of " + maxMessageSize + " 
bytes.");
+                    throw new TokenTooLargeException("A record's size exceeds 
the maximum allowed message size of " + maxMessageSize + " bytes.");
                 }
-                recordCount = writeResult.getRecordCount();
-
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
                 attributes.put("record.count", Integer.toString(recordCount));
-                attributes.putAll(writeResult.getAttributes());
             }
 
-            if (recordCount > 0 || sendZeroResults) {
-                final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, null, baos.toByteArray());
-                try {
-                    producer.send(record, (metadata, exception) -> {
-                        if (exception != null) {
-                            throw new KafkaSendException(exception);
-                        }
-                    }).get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
-                } catch (KafkaSendException kse) {
-                    Throwable t = kse.getCause();
-                    if (t instanceof IOException) {
-                        throw (IOException) t;
-                    } else {
-                        throw new IOException(t);
-                    }
-                } catch (final InterruptedException e) {
-                    getLogger().warn("Interrupted while waiting for an 
acknowledgement from Kafka");
-                    Thread.currentThread().interrupt();
-                } catch (final TimeoutException e) {
-                    getLogger().warn("Timed out while waiting for an 
acknowledgement from Kafka");
+            if (recordCount == 0) {
+                if (sendZeroResults) {
+                    sendMessage(topic, new byte[0]);
+                } else {
+                    return WriteResult.EMPTY;
                 }
-            } else {
-                writeResult = WriteResult.EMPTY;
             }
 
-            return writeResult;
+            acknowledgeTransmission();
+
+            return WriteResult.of(recordCount, attributes);
         } catch (IOException ioe) {
             throw ioe;
         } catch (Exception e) {
             throw new IOException("Failed to write metrics using record 
writer: " + e.getMessage(), e);
         }
+    }
 
+    public void sendMessage(String topic, byte[] payload) throws IOException, 
ExecutionException {
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, null, payload);
+        // Add the Future to the queue
+        ackQ.add(producer.send(record, (metadata, exception) -> {
+            if (exception != null) {
+                throw new KafkaSendException(exception);
+            }
+        }));

Review comment:
       I don't think the callback is necessary here, is it? If the send fails, 
the call to `Future.get()` should throw the Exception. I don't think we need to 
create another Exception here?

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
##########
@@ -232,64 +238,77 @@ public void onEnabled(final ConfigurationContext context) 
throws InitializationE
     public WriteResult sendData(final RecordSet recordSet, final Map<String, 
String> attributes, final boolean sendZeroResults) throws IOException {
 
         try {
-            WriteResult writeResult;
             final RecordSchema writeSchema = 
getWriterFactory().getSchema(null, recordSet.getSchema());
             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             final ByteCountingOutputStream out = new 
ByteCountingOutputStream(baos);
             int recordCount = 0;
             try (final RecordSetWriter writer = 
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
-                writer.beginRecordSet();
                 Record record;
                 while ((record = recordSet.next()) != null) {

Review comment:
       This is an unbounded RecordSet. This could quickly lead to heap 
exhaustion. Might make sense to only send 100 or 1,000 records (or some 
configurable number of Records), and then after those have been sent wait for 
the results, and then send another batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to