This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 82bac859ef NIFI-12371 Support tombstone messages in non-record Kafka 
processors
82bac859ef is described below

commit 82bac859ef1a1989557737fe94bdc0ece4e1778c
Author: Pierre Villard <pierre.villard...@gmail.com>
AuthorDate: Fri Nov 10 13:08:59 2023 -0600

    NIFI-12371 Support tombstone messages in non-record Kafka processors
    
    This closes #8076
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
    
    (cherry picked from commit ee2368e0ae684d8a3ca2e62ce89422a2e260bdce)
---
 .../apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java |  3 ++-
 .../apache/nifi/processors/kafka/pubsub/ConsumerLease.java    |  2 ++
 .../apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java |  3 +++
 .../apache/nifi/processors/kafka/pubsub/PublisherLease.java   | 11 ++++++++---
 .../nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java   |  2 ++
 5 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 8bd3398747..019f7daa55 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -75,7 +75,8 @@ import java.util.regex.Pattern;
     @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET, 
description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, 
description = "The timestamp of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, 
description = "The partition of the topic the message or message bundle is 
from"),
-    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, 
description = "The topic the message or message bundle is from")
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, 
description = "The topic the message or message bundle is from"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "Set to true if the consumed message is a tombstone message")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = 
"The value of a given Kafka configuration property.",
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 3883842694..863006b4db 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -474,6 +474,8 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
         final byte[] value = record.value();
         if (value != null) {
             flowFile = session.write(flowFile, out -> out.write(value));
+        } else {
+            flowFile = session.putAttribute(flowFile, 
KafkaFlowFileAttribute.KAFKA_TOMBSTONE, Boolean.TRUE.toString());
         }
         flowFile = session.putAllAttributes(flowFile, getAttributes(record));
         tracker.updateFlowFile(flowFile);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index acab0fe1cd..cf047f3d23 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -87,6 +88,8 @@ import static 
org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFK
         + " In the event a dynamic property represents a property that was 
already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "If this attribute is set to 'true', if the processor is not 
configured "
+        + "with a demarcator and if the FlowFile's content is null, then a 
tombstone message with zero bytes will be sent to Kafka.")
 @WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
     + "FlowFiles that are routed to success. If the <Message Demarcator> 
Property is not set, this will always be 1, but if the Property is set, it may "
     + "be greater than 1.")
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index ab4a283d3a..3664bb810d 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
 import org.apache.nifi.kafka.shared.property.PublishStrategy;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -160,9 +161,13 @@ public class PublisherLease implements Closeable {
                     tracker.fail(flowFile, new TokenTooLargeException("A 
message in the stream exceeds the maximum allowed message size of " + 
maxMessageSize + " bytes."));
                     return;
                 }
-                // Send FlowFile content as it is, to support sending 0 byte 
message.
-                messageContent = new byte[(int) flowFile.getSize()];
-                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                if 
(Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE))
 && flowFile.getSize() == 0) {
+                    messageContent = null;
+                } else {
+                    // Send FlowFile content as it is, to support sending 0 
byte message.
+                    messageContent = new byte[(int) flowFile.getSize()];
+                    StreamUtils.fillBuffer(flowFileContent, messageContent);
+                }
                 publish(flowFile, messageKey, messageContent, topic, tracker, 
partition);
                 return;
             }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
index 991f3b7a9e..15aa16ed76 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
@@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute {
     String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id";
 
     String KAFKA_CONSUMER_OFFSETS_COMMITTED = 
"kafka.consumer.offsets.committed";
+
+    String KAFKA_TOMBSTONE = "kafka.tombstone";
 }

Reply via email to