This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new b426226207 NIFI-10993 - PublishKafkaRecord should use correct record schema b426226207 is described below commit b426226207e031b2808a3a759ebf6ab05d1400f9 Author: Paul Grey <gre...@yahoo.com> AuthorDate: Mon Dec 19 18:44:20 2022 -0500 NIFI-10993 - PublishKafkaRecord should use correct record schema This closes #6833. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../processors/kafka/pubsub/PublisherLease.java | 2 +- .../kafka/pubsub/TestPublishKafkaMock.java | 205 +++++++++++++++++---- 2 files changed, 175 insertions(+), 32 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index da37db0319..ab4a283d3a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -356,7 +356,7 @@ public class PublisherLease implements Closeable { } final Record record = (Record) object; - final RecordSchema schema = record.getSchema(); + final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), record.getSchema()); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) { writer.write(record); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java index e379dc70bf..64b1bf248b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java @@ -19,12 +19,16 @@ package org.apache.nifi.processors.kafka.pubsub; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; +import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.kafka.shared.property.PublishStrategy; @@ -35,13 +39,21 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MockSchemaRegistry; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -146,14 +158,16 @@ public class TestPublishKafkaMock { @Test public void testPublishRecordWrapperStrategyNullKey() throws JsonProcessingException, InitializationException { // create flowfile to publish - final Map<String, String> attributes = new TreeMap<>(); - attributes.put("attrKeyA", "attrValueA"); - attributes.put("attrKeyB", "attrValueB"); - attributes.put("messageKey", "this-is-a-key"); - final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB"); - final String value = mapper.writeValueAsString(node); + final ObjectNode valueNode = mapper.createObjectNode() + .put("recordA", 1).put("recordB", "valueB"); + final ObjectNode recordNode = mapper.createObjectNode(); + recordNode.set("metadata", mapper.createObjectNode() + .put("topic", TEST_TOPIC_PUBLISH)); + recordNode.set("headers", mapper.createObjectNode() + .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB")); + recordNode.set("value", valueNode); + final String value = mapper.writeValueAsString(recordNode); final MockFlowFile flowFile = new MockFlowFile(++ordinal); - flowFile.putAttributes(attributes); flowFile.setData(value.getBytes(UTF_8)); // publish flowfile final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>(); @@ -174,20 +188,26 @@ public class TestPublishKafkaMock { assertEquals("attrValueB", new String(headers.lastHeader("attrKeyB").value(), UTF_8)); assertNull(record.key()); assertNotNull(record.value()); - assertEquals(value, new String(record.value(), UTF_8)); + final String valueStringExpected = mapper.writeValueAsString(valueNode); + final String valueStringActual = new String(record.value(), UTF_8); + assertEquals(valueStringExpected, valueStringActual); } @Test public void testPublishRecordWrapperStrategyStringKey() throws JsonProcessingException, InitializationException { // create flowfile to publish - final Map<String, String> attributes = new TreeMap<>(); - attributes.put("attrKeyA", "attrValueA"); - attributes.put("attrKeyB", "attrValueB"); - attributes.put("messageKey", "this-is-a-key"); - final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB"); - final String value = mapper.writeValueAsString(node); + final ObjectNode metadataNode = mapper.createObjectNode() + .put("topic", TEST_TOPIC_PUBLISH); + final ObjectNode headersNode = mapper.createObjectNode() + .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB"); + final ObjectNode valueNode = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB"); + final ObjectNode recordNode = mapper.createObjectNode(); + recordNode.set("metadata", metadataNode); + recordNode.set("headers", headersNode); + recordNode.put("key", "valueB"); + recordNode.set("value", valueNode); + final String value = mapper.writeValueAsString(recordNode); final MockFlowFile flowFile = new MockFlowFile(++ordinal); - flowFile.putAttributes(attributes); flowFile.setData(value.getBytes(UTF_8)); // publish flowfile final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>(); @@ -204,12 +224,13 @@ public class TestPublishKafkaMock { final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next(); assertEquals(TEST_TOPIC_PUBLISH, record.topic()); final Headers headers = record.headers(); - assertEquals(1, headers.toArray().length); + assertEquals(2, headers.toArray().length); assertEquals("attrValueB", new String(headers.lastHeader("attrKeyB").value(), UTF_8)); assertNotNull(record.key()); assertEquals("valueB", new String(record.key(), UTF_8)); assertNotNull(record.value()); - assertEquals(value, new String(record.value(), UTF_8)); + final String valueString = mapper.writeValueAsString(valueNode); + assertEquals(valueString, new String(record.value(), UTF_8)); } @Test @@ -219,8 +240,11 @@ public class TestPublishKafkaMock { attributes.put("attrKeyA", "attrValueA"); attributes.put("attrKeyB", "attrValueB"); attributes.put("messageKey", "this-is-a-key"); - final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB"); - final String value = mapper.writeValueAsString(node); + final ObjectNode valueNode = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB"); + final ObjectNode recordNode = mapper.createObjectNode(); + recordNode.put("key", "valueB"); + recordNode.set("value", valueNode); + final String value = mapper.writeValueAsString(recordNode); final MockFlowFile flowFile = new MockFlowFile(++ordinal); flowFile.putAttributes(attributes); flowFile.setData(value.getBytes(UTF_8)); @@ -239,7 +263,8 @@ public class TestPublishKafkaMock { assertEquals(1, producedRecords.size()); final ProducerRecord<byte[], byte[]> producedRecord = producedRecords.iterator().next(); assertEquals("valueB", new String(producedRecord.key(), UTF_8)); - assertEquals(value, new String(producedRecord.value(), UTF_8)); + final String valueString = mapper.writeValueAsString(valueNode); + assertEquals(valueString, new String(producedRecord.value(), UTF_8)); final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS); final MockFlowFile flowFile1 = success.iterator().next(); assertNotNull(flowFile1.getAttribute("uuid")); @@ -251,10 +276,13 @@ public class TestPublishKafkaMock { final Map<String, String> attributes = new TreeMap<>(); attributes.put("attrKeyA", "attrValueA"); attributes.put("attrKeyB", "attrValueB"); - final ObjectNode key = mapper.createObjectNode().put("recordKey", "recordValue"); - final ObjectNode node = mapper.createObjectNode() - .put("recordA", 1).put("recordB", "valueB").set("recordKey", key); - final String value = mapper.writeValueAsString(node); + final ObjectNode keyNode = mapper.createObjectNode().put("recordKey", "recordValue"); + final ObjectNode valueNode = mapper.createObjectNode() + .put("recordA", 1).put("recordB", "valueB"); + final ObjectNode recordNode = mapper.createObjectNode(); + recordNode.set("key", keyNode); + recordNode.set("value", valueNode); + final String value = mapper.writeValueAsString(recordNode); final MockFlowFile flowFile = new MockFlowFile(++ordinal); flowFile.putAttributes(attributes); flowFile.setData(value.getBytes(UTF_8)); @@ -263,8 +291,7 @@ public class TestPublishKafkaMock { final TestRunner runner = getTestRunner(producedRecords); runner.setProperty("topic", TEST_TOPIC_PUBLISH); runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); - runner.setProperty("message-key-field", "recordKey"); - runner.setProperty("record-key-writer", "record-writer"); + runner.setProperty("record-key-writer", "record-key-writer"); runner.enqueue(flowFile); runner.run(1); // verify results @@ -275,10 +302,13 @@ public class TestPublishKafkaMock { final Headers headers = record.headers(); assertEquals(0, headers.toArray().length); assertNotNull(record.key()); - final String keyString = mapper.writeValueAsString(key); - assertEquals(keyString, new String(record.key(), UTF_8)); + final String keyStringExpected = mapper.writeValueAsString(keyNode); + final String keyStringActual = new String(record.key(), UTF_8); + assertEquals(keyStringExpected, keyStringActual); assertNotNull(record.value()); - assertEquals(value, new String(record.value(), UTF_8)); + final String valueStringExpected = mapper.writeValueAsString(valueNode); + final String valueStringActual = new String(record.value(), UTF_8); + assertEquals(valueStringExpected, valueStringActual); } private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) @@ -309,6 +339,119 @@ public class TestPublishKafkaMock { return runner; } + @Test + public void testPublishRecordWrapperStrategyRecordKeySchema() throws IOException, InitializationException { + // create flowfile to publish + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("schema-access-strategy", "schema-name"); + attributes.put("schema.name", "schemaValue"); + attributes.put("schema.key.name", "schemaKey"); + final ObjectNode recordKey = mapper.createObjectNode() + .put("keyA", "value1") + .put("keyB", "value2"); + final ObjectNode recordValue = mapper.createObjectNode() + .put("valueA", "value1") + .put("valueB", "value2"); + final ObjectNode record = mapper.createObjectNode(); + record.set("key", recordKey); + record.set("value", recordValue); + final String value = mapper.writeValueAsString(record); + final MockFlowFile flowFile = new MockFlowFile(++ordinal); + flowFile.putAttributes(attributes); + flowFile.setData(value.getBytes(UTF_8)); + // publish flowfile + final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>(); + final TestRunner runner = getTestRunnerSchemaRegistry(producedRecords); + runner.setProperty("topic", TEST_TOPIC_PUBLISH); + runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); + runner.setProperty("record-key-writer", "record-key-writer"); + runner.enqueue(flowFile); + runner.run(1); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1); + assertEquals(1, producedRecords.size()); + final ProducerRecord<byte[], byte[]> producerRecord = producedRecords.iterator().next(); + + final DataFileStream<GenericData.Record> dataReaderKey = new DataFileStream<>( + new ByteArrayInputStream(producerRecord.key()), new GenericDatumReader<>(null)); + final GenericData.Record genericRecordKey = dataReaderKey.next(); + assertEquals("value1", genericRecordKey.get("keyA").toString()); + assertEquals("value2", genericRecordKey.get("keyB").toString()); + assertEquals("value3", genericRecordKey.get("keyC").toString()); + + final DataFileStream<GenericData.Record> dataReaderValue = new DataFileStream<>( + new ByteArrayInputStream(producerRecord.value()), new GenericDatumReader<>(null)); + final GenericData.Record genericRecordValue = dataReaderValue.next(); + assertEquals("value1", genericRecordValue.get("valueA").toString()); + assertEquals("value2", genericRecordValue.get("valueB").toString()); + assertEquals("value3", genericRecordValue.get("valueC").toString()); + } + + private TestRunner getTestRunnerSchemaRegistry(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) + throws InitializationException { + final RecordSchema schemaKey = new SimpleRecordSchema(Arrays.asList( + new RecordField("keyA", RecordFieldType.STRING.getDataType()), + new RecordField("keyB", RecordFieldType.STRING.getDataType()), + new RecordField("keyC", RecordFieldType.STRING.getDataType(), "value3"))); + final RecordSchema schemaValue = new SimpleRecordSchema(Arrays.asList( + new RecordField("valueA", RecordFieldType.STRING.getDataType()), + new RecordField("valueB", RecordFieldType.STRING.getDataType()), + new RecordField("valueC", RecordFieldType.STRING.getDataType(), "value3"))); + final RecordSchema schemaRecord = new SimpleRecordSchema(Arrays.asList( + new RecordField("key", RecordFieldType.RECORD.getRecordDataType(schemaKey)), + new RecordField("value", RecordFieldType.RECORD.getRecordDataType(schemaValue)))); + + final String schemaRegistryId = "schema-registry"; + final MockSchemaRegistry schemaRegistry = new MockSchemaRegistry(); + schemaRegistry.addSchema("schemaKey", schemaKey); + schemaRegistry.addSchema("schemaValue", schemaValue); + schemaRegistry.addSchema("schemaRecord", schemaRecord); + + final String readerId = "record-reader"; + final RecordReaderFactory readerService = new JsonTreeReader(); + + final Map<String, String> propertiesReaderService = new TreeMap<>(); + + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new AvroRecordSetWriter(); + final String keyWriterId = "record-key-writer"; + final RecordSetWriterFactory keyWriterService = new AvroRecordSetWriter(); + + final Map<String, String> propertiesWriterService = new TreeMap<>(); + propertiesWriterService.put(schemaRegistryId, schemaRegistryId); + propertiesWriterService.put("schema-access-strategy", "schema-name"); + propertiesWriterService.put("schema-name", "schemaValue"); + final Map<String, String> propertiesWriterServiceKey = new TreeMap<>(); + propertiesWriterServiceKey.put(schemaRegistryId, schemaRegistryId); + propertiesWriterServiceKey.put("schema-access-strategy", "schema-name"); + propertiesWriterServiceKey.put("schema-name", "schemaKey"); + + final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() { + @Override + protected PublisherPool createPublisherPool(final ProcessContext context) { + return getPublisherPool(producedRecords, context); + } + }; + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setValidateExpressionUsage(false); + + runner.addControllerService(schemaRegistryId, schemaRegistry); + runner.enableControllerService(schemaRegistry); + + runner.addControllerService(readerId, readerService, propertiesReaderService); + runner.enableControllerService(readerService); + runner.setProperty(readerId, readerId); + + runner.addControllerService(writerId, writerService, propertiesWriterService); + runner.enableControllerService(writerService); + runner.setProperty(writerId, writerId); + + runner.addControllerService(keyWriterId, keyWriterService, propertiesWriterServiceKey); + runner.enableControllerService(keyWriterService); + runner.setProperty(keyWriterId, keyWriterId); + + return runner; + } + private PublisherPool getPublisherPool(final Collection<ProducerRecord<byte[], byte[]>> producedRecords, final ProcessContext context) { final int maxMessageSize = context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue(); @@ -350,7 +493,7 @@ public class TestPublishKafkaMock { final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex); final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer") .asControllerService(RecordSetWriterFactory.class); - + final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue()); final Producer<byte[], byte[]> producer = mock(ProducerBB.class); when(producer.send(any(), any())).then(invocation -> { final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0); @@ -368,7 +511,7 @@ public class TestPublishKafkaMock { true, patternAttributeName, UTF_8, - null, + publishStrategy, keyWriterFactory); } }