This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b426226207 NIFI-10993 - PublishKafkaRecord should use correct record 
schema
b426226207 is described below

commit b426226207e031b2808a3a759ebf6ab05d1400f9
Author: Paul Grey <gre...@yahoo.com>
AuthorDate: Mon Dec 19 18:44:20 2022 -0500

    NIFI-10993 - PublishKafkaRecord should use correct record schema
    
    This closes #6833.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../processors/kafka/pubsub/PublisherLease.java    |   2 +-
 .../kafka/pubsub/TestPublishKafkaMock.java         | 205 +++++++++++++++++----
 2 files changed, 175 insertions(+), 32 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index da37db0319..ab4a283d3a 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -356,7 +356,7 @@ public class PublisherLease implements Closeable {
             }
 
             final Record record = (Record) object;
-            final RecordSchema schema = record.getSchema();
+            final RecordSchema schema = 
writerFactory.getSchema(flowFile.getAttributes(), record.getSchema());
             try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
                  final RecordSetWriter writer = 
writerFactory.createWriter(logger, schema, baos, flowFile)) {
                 writer.write(record);
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
index e379dc70bf..64b1bf248b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
@@ -19,12 +19,16 @@ package org.apache.nifi.processors.kafka.pubsub;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
+import org.apache.nifi.avro.AvroRecordSetWriter;
 import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.kafka.shared.property.PublishStrategy;
@@ -35,13 +39,21 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -146,14 +158,16 @@ public class TestPublishKafkaMock {
     @Test
     public void testPublishRecordWrapperStrategyNullKey() throws 
JsonProcessingException, InitializationException {
         // create flowfile to publish
-        final Map<String, String> attributes = new TreeMap<>();
-        attributes.put("attrKeyA", "attrValueA");
-        attributes.put("attrKeyB", "attrValueB");
-        attributes.put("messageKey", "this-is-a-key");
-        final ObjectNode node = mapper.createObjectNode().put("recordA", 
1).put("recordB", "valueB");
-        final String value = mapper.writeValueAsString(node);
+        final ObjectNode valueNode = mapper.createObjectNode()
+                .put("recordA", 1).put("recordB", "valueB");
+        final ObjectNode recordNode = mapper.createObjectNode();
+        recordNode.set("metadata", mapper.createObjectNode()
+                .put("topic", TEST_TOPIC_PUBLISH));
+        recordNode.set("headers", mapper.createObjectNode()
+                .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB"));
+        recordNode.set("value", valueNode);
+        final String value = mapper.writeValueAsString(recordNode);
         final MockFlowFile flowFile = new MockFlowFile(++ordinal);
-        flowFile.putAttributes(attributes);
         flowFile.setData(value.getBytes(UTF_8));
         // publish flowfile
         final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new 
ArrayList<>();
@@ -174,20 +188,26 @@ public class TestPublishKafkaMock {
         assertEquals("attrValueB", new 
String(headers.lastHeader("attrKeyB").value(), UTF_8));
         assertNull(record.key());
         assertNotNull(record.value());
-        assertEquals(value, new String(record.value(), UTF_8));
+        final String valueStringExpected = 
mapper.writeValueAsString(valueNode);
+        final String valueStringActual = new String(record.value(), UTF_8);
+        assertEquals(valueStringExpected, valueStringActual);
     }
 
     @Test
     public void testPublishRecordWrapperStrategyStringKey() throws 
JsonProcessingException, InitializationException {
         // create flowfile to publish
-        final Map<String, String> attributes = new TreeMap<>();
-        attributes.put("attrKeyA", "attrValueA");
-        attributes.put("attrKeyB", "attrValueB");
-        attributes.put("messageKey", "this-is-a-key");
-        final ObjectNode node = mapper.createObjectNode().put("recordA", 
1).put("recordB", "valueB");
-        final String value = mapper.writeValueAsString(node);
+        final ObjectNode metadataNode = mapper.createObjectNode()
+                .put("topic", TEST_TOPIC_PUBLISH);
+        final ObjectNode headersNode = mapper.createObjectNode()
+                .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB");
+        final ObjectNode valueNode = mapper.createObjectNode().put("recordA", 
1).put("recordB", "valueB");
+        final ObjectNode recordNode = mapper.createObjectNode();
+        recordNode.set("metadata", metadataNode);
+        recordNode.set("headers", headersNode);
+        recordNode.put("key", "valueB");
+        recordNode.set("value", valueNode);
+        final String value = mapper.writeValueAsString(recordNode);
         final MockFlowFile flowFile = new MockFlowFile(++ordinal);
-        flowFile.putAttributes(attributes);
         flowFile.setData(value.getBytes(UTF_8));
         // publish flowfile
         final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new 
ArrayList<>();
@@ -204,12 +224,13 @@ public class TestPublishKafkaMock {
         final ProducerRecord<byte[], byte[]> record = 
producedRecords.iterator().next();
         assertEquals(TEST_TOPIC_PUBLISH, record.topic());
         final Headers headers = record.headers();
-        assertEquals(1, headers.toArray().length);
+        assertEquals(2, headers.toArray().length);
         assertEquals("attrValueB", new 
String(headers.lastHeader("attrKeyB").value(), UTF_8));
         assertNotNull(record.key());
         assertEquals("valueB", new String(record.key(), UTF_8));
         assertNotNull(record.value());
-        assertEquals(value, new String(record.value(), UTF_8));
+        final String valueString = mapper.writeValueAsString(valueNode);
+        assertEquals(valueString, new String(record.value(), UTF_8));
     }
 
     @Test
@@ -219,8 +240,11 @@ public class TestPublishKafkaMock {
         attributes.put("attrKeyA", "attrValueA");
         attributes.put("attrKeyB", "attrValueB");
         attributes.put("messageKey", "this-is-a-key");
-        final ObjectNode node = mapper.createObjectNode().put("recordA", 
1).put("recordB", "valueB");
-        final String value = mapper.writeValueAsString(node);
+        final ObjectNode valueNode = mapper.createObjectNode().put("recordA", 
1).put("recordB", "valueB");
+        final ObjectNode recordNode = mapper.createObjectNode();
+        recordNode.put("key", "valueB");
+        recordNode.set("value", valueNode);
+        final String value = mapper.writeValueAsString(recordNode);
         final MockFlowFile flowFile = new MockFlowFile(++ordinal);
         flowFile.putAttributes(attributes);
         flowFile.setData(value.getBytes(UTF_8));
@@ -239,7 +263,8 @@ public class TestPublishKafkaMock {
         assertEquals(1, producedRecords.size());
         final ProducerRecord<byte[], byte[]> producedRecord = 
producedRecords.iterator().next();
         assertEquals("valueB", new String(producedRecord.key(), UTF_8));
-        assertEquals(value, new String(producedRecord.value(), UTF_8));
+        final String valueString = mapper.writeValueAsString(valueNode);
+        assertEquals(valueString, new String(producedRecord.value(), UTF_8));
         final List<MockFlowFile> success = 
runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS);
         final MockFlowFile flowFile1 = success.iterator().next();
         assertNotNull(flowFile1.getAttribute("uuid"));
@@ -251,10 +276,13 @@ public class TestPublishKafkaMock {
         final Map<String, String> attributes = new TreeMap<>();
         attributes.put("attrKeyA", "attrValueA");
         attributes.put("attrKeyB", "attrValueB");
-        final ObjectNode key = mapper.createObjectNode().put("recordKey", 
"recordValue");
-        final ObjectNode node = mapper.createObjectNode()
-                .put("recordA", 1).put("recordB", "valueB").set("recordKey", 
key);
-        final String value = mapper.writeValueAsString(node);
+        final ObjectNode keyNode = mapper.createObjectNode().put("recordKey", 
"recordValue");
+        final ObjectNode valueNode = mapper.createObjectNode()
+                .put("recordA", 1).put("recordB", "valueB");
+        final ObjectNode recordNode = mapper.createObjectNode();
+        recordNode.set("key", keyNode);
+        recordNode.set("value", valueNode);
+        final String value = mapper.writeValueAsString(recordNode);
         final MockFlowFile flowFile = new MockFlowFile(++ordinal);
         flowFile.putAttributes(attributes);
         flowFile.setData(value.getBytes(UTF_8));
@@ -263,8 +291,7 @@ public class TestPublishKafkaMock {
         final TestRunner runner = getTestRunner(producedRecords);
         runner.setProperty("topic", TEST_TOPIC_PUBLISH);
         runner.setProperty("publish-strategy", 
PublishStrategy.USE_WRAPPER.name());
-        runner.setProperty("message-key-field", "recordKey");
-        runner.setProperty("record-key-writer", "record-writer");
+        runner.setProperty("record-key-writer", "record-key-writer");
         runner.enqueue(flowFile);
         runner.run(1);
         // verify results
@@ -275,10 +302,13 @@ public class TestPublishKafkaMock {
         final Headers headers = record.headers();
         assertEquals(0, headers.toArray().length);
         assertNotNull(record.key());
-        final String keyString = mapper.writeValueAsString(key);
-        assertEquals(keyString, new String(record.key(), UTF_8));
+        final String keyStringExpected = mapper.writeValueAsString(keyNode);
+        final String keyStringActual = new String(record.key(), UTF_8);
+        assertEquals(keyStringExpected, keyStringActual);
         assertNotNull(record.value());
-        assertEquals(value, new String(record.value(), UTF_8));
+        final String valueStringExpected = 
mapper.writeValueAsString(valueNode);
+        final String valueStringActual = new String(record.value(), UTF_8);
+        assertEquals(valueStringExpected, valueStringActual);
     }
 
     private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], 
byte[]>> producedRecords)
@@ -309,6 +339,119 @@ public class TestPublishKafkaMock {
         return runner;
     }
 
+    @Test
+    public void testPublishRecordWrapperStrategyRecordKeySchema() throws 
IOException, InitializationException {
+        // create flowfile to publish
+        final Map<String, String> attributes = new TreeMap<>();
+        attributes.put("schema-access-strategy", "schema-name");
+        attributes.put("schema.name", "schemaValue");
+        attributes.put("schema.key.name", "schemaKey");
+        final ObjectNode recordKey = mapper.createObjectNode()
+                .put("keyA", "value1")
+                .put("keyB", "value2");
+        final ObjectNode recordValue = mapper.createObjectNode()
+                .put("valueA", "value1")
+                .put("valueB", "value2");
+        final ObjectNode record = mapper.createObjectNode();
+        record.set("key", recordKey);
+        record.set("value", recordValue);
+        final String value = mapper.writeValueAsString(record);
+        final MockFlowFile flowFile = new MockFlowFile(++ordinal);
+        flowFile.putAttributes(attributes);
+        flowFile.setData(value.getBytes(UTF_8));
+        // publish flowfile
+        final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new 
ArrayList<>();
+        final TestRunner runner = getTestRunnerSchemaRegistry(producedRecords);
+        runner.setProperty("topic", TEST_TOPIC_PUBLISH);
+        runner.setProperty("publish-strategy", 
PublishStrategy.USE_WRAPPER.name());
+        runner.setProperty("record-key-writer", "record-key-writer");
+        runner.enqueue(flowFile);
+        runner.run(1);
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
+        assertEquals(1, producedRecords.size());
+        final ProducerRecord<byte[], byte[]> producerRecord = 
producedRecords.iterator().next();
+
+        final DataFileStream<GenericData.Record> dataReaderKey = new 
DataFileStream<>(
+                new ByteArrayInputStream(producerRecord.key()), new 
GenericDatumReader<>(null));
+        final GenericData.Record genericRecordKey = dataReaderKey.next();
+        assertEquals("value1", genericRecordKey.get("keyA").toString());
+        assertEquals("value2", genericRecordKey.get("keyB").toString());
+        assertEquals("value3", genericRecordKey.get("keyC").toString());
+
+        final DataFileStream<GenericData.Record> dataReaderValue = new 
DataFileStream<>(
+                new ByteArrayInputStream(producerRecord.value()), new 
GenericDatumReader<>(null));
+        final GenericData.Record genericRecordValue = dataReaderValue.next();
+        assertEquals("value1", genericRecordValue.get("valueA").toString());
+        assertEquals("value2", genericRecordValue.get("valueB").toString());
+        assertEquals("value3", genericRecordValue.get("valueC").toString());
+    }
+
+    private TestRunner getTestRunnerSchemaRegistry(final 
Collection<ProducerRecord<byte[], byte[]>> producedRecords)
+            throws InitializationException {
+        final RecordSchema schemaKey = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("keyA", RecordFieldType.STRING.getDataType()),
+                new RecordField("keyB", RecordFieldType.STRING.getDataType()),
+                new RecordField("keyC", RecordFieldType.STRING.getDataType(), 
"value3")));
+        final RecordSchema schemaValue = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("valueA", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("valueB", 
RecordFieldType.STRING.getDataType()),
+                new RecordField("valueC", 
RecordFieldType.STRING.getDataType(), "value3")));
+        final RecordSchema schemaRecord = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(schemaKey)),
+                new RecordField("value", 
RecordFieldType.RECORD.getRecordDataType(schemaValue))));
+
+        final String schemaRegistryId = "schema-registry";
+        final MockSchemaRegistry schemaRegistry = new MockSchemaRegistry();
+        schemaRegistry.addSchema("schemaKey", schemaKey);
+        schemaRegistry.addSchema("schemaValue", schemaValue);
+        schemaRegistry.addSchema("schemaRecord", schemaRecord);
+
+        final String readerId = "record-reader";
+        final RecordReaderFactory readerService = new JsonTreeReader();
+
+        final Map<String, String> propertiesReaderService = new TreeMap<>();
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new AvroRecordSetWriter();
+        final String keyWriterId = "record-key-writer";
+        final RecordSetWriterFactory keyWriterService = new 
AvroRecordSetWriter();
+
+        final Map<String, String> propertiesWriterService = new TreeMap<>();
+        propertiesWriterService.put(schemaRegistryId, schemaRegistryId);
+        propertiesWriterService.put("schema-access-strategy", "schema-name");
+        propertiesWriterService.put("schema-name", "schemaValue");
+        final Map<String, String> propertiesWriterServiceKey = new TreeMap<>();
+        propertiesWriterServiceKey.put(schemaRegistryId, schemaRegistryId);
+        propertiesWriterServiceKey.put("schema-access-strategy", 
"schema-name");
+        propertiesWriterServiceKey.put("schema-name", "schemaKey");
+
+        final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext 
context) {
+                return getPublisherPool(producedRecords, context);
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setValidateExpressionUsage(false);
+
+        runner.addControllerService(schemaRegistryId, schemaRegistry);
+        runner.enableControllerService(schemaRegistry);
+
+        runner.addControllerService(readerId, readerService, 
propertiesReaderService);
+        runner.enableControllerService(readerService);
+        runner.setProperty(readerId, readerId);
+
+        runner.addControllerService(writerId, writerService, 
propertiesWriterService);
+        runner.enableControllerService(writerService);
+        runner.setProperty(writerId, writerId);
+
+        runner.addControllerService(keyWriterId, keyWriterService, 
propertiesWriterServiceKey);
+        runner.enableControllerService(keyWriterService);
+        runner.setProperty(keyWriterId, keyWriterId);
+
+        return runner;
+    }
+
     private PublisherPool getPublisherPool(final 
Collection<ProducerRecord<byte[], byte[]>> producedRecords,
                                            final ProcessContext context) {
         final int maxMessageSize = 
context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue();
@@ -350,7 +493,7 @@ public class TestPublishKafkaMock {
         final Pattern patternAttributeName = (attributeNameRegex == null) ? 
null : Pattern.compile(attributeNameRegex);
         final RecordSetWriterFactory keyWriterFactory = 
context.getProperty("record-key-writer")
                 .asControllerService(RecordSetWriterFactory.class);
-
+        final PublishStrategy publishStrategy = 
PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
         final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
         when(producer.send(any(), any())).then(invocation -> {
             final ProducerRecord<byte[], byte[]> record = 
invocation.getArgument(0);
@@ -368,7 +511,7 @@ public class TestPublishKafkaMock {
                 true,
                 patternAttributeName,
                 UTF_8,
-                null,
+                publishStrategy,
                 keyWriterFactory);
     }
 }

Reply via email to