exceptionfactory commented on code in PR #10538: URL: https://github.com/apache/nifi/pull/10538#discussion_r2558557142
########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestConsumerPartitionsUtil { + private final ComponentLog logger = mock(); + private String hostname; + + @BeforeEach + public void setup() throws UnknownHostException { + hostname = InetAddress.getLocalHost().getHostName(); + } + + @Test + public void testNoPartitionAssignments() throws UnknownHostException { + final Map<String, String> properties = Collections.singletonMap("key", "value"); Review Comment: ```suggestion final Map<String, String> properties = Map.of("key", "value"); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Subscription.java: ########## @@ -31,19 +31,22 @@ public class Subscription { private final String groupId; private final Collection<String> topics; + private final Integer partition; private final Pattern topicPattern; private final AutoOffsetReset autoOffsetReset; - public Subscription(final String groupId, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) { + public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) { Review Comment: Very minor, but I recommend placing `partition` after `topics` to align with general hierarchy: ```suggestion public Subscription(final String groupId, final Collection<String> topics, final Integer partition, final AutoOffsetReset autoOffsetReset) { ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/ConsumerPartitionsUtil.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ConsumerPartitionsUtil { + public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions."; + + public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException { + final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties); + final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString); + + if (partitionsByHost.isEmpty()) { + // Explicit partitioning is not enabled. + logger.debug("No explicit Consumer Partitions have been declared."); + return null; + } + + logger.info("Found the following mapping of hosts to partitions: {}", hostnameToPartitionString); Review Comment: This seems more appropriate as a debug level message. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestConsumerPartitionsUtil { Review Comment: Minor note, the `public` modifiers on the class and method level are not necessary for JUnit 5. Since this is a new test class, recommend removing them. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final PollingContext pollingContext = createPollingContext(context); - final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - try { - final List<PartitionState> partitionStates = consumerService.getPartitionStates(); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); - } catch (final Exception e) { - getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + final Collection<String> topics = pollingContext.getTopics(); + if (!topics.isEmpty()) { + try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + try { + final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList()); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); + } catch (final Exception e) { + getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + } + verificationResults.add(verificationPartitions.build()); + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after verification.", e); + } } - verificationResults.add(verificationPartitions.build()); return verificationResults; } private KafkaConsumerService getConsumerService(final ProcessContext context) { + recreatePartitionedConsumerServices(); + final KafkaConsumerService consumerService = consumerServices.poll(); if (consumerService != null) { return consumerService; } - final int activeCount = activeConsumerCount.incrementAndGet(); - if (activeCount > getMaxConsumerCount()) { - getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); - activeConsumerCount.decrementAndGet(); + final boolean isExplicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()); + + if (isExplicitPartitionMapping) { + getLogger().trace("No Partitioned Kafka Consumer Service available, all specified partitions are being consumed from."); return null; + } else { + final int activeCount = activeConsumerCount.incrementAndGet(); + if (activeCount > getMaxConsumerCount()) { + getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); + activeConsumerCount.decrementAndGet(); + return null; + } + + getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); + return connectionService.getConsumerService(pollingContext); } + } - getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); - final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - return connectionService.getConsumerService(pollingContext); + private int getPartitionCount(final KafkaConnectionService connectionService) { + Collection<String> topics = this.pollingContext.getTopics(); + + if (topics.isEmpty()) { + return -1; + } + + int partitionsEachTopic = 0; + try (KafkaConsumerService kafkaConsumerService = connectionService.getConsumerService(this.pollingContext)) { + Map<String, List<PartitionState>> topicToPartitionStates = kafkaConsumerService.getPartitionStatesByTopic(); + for (List<PartitionState> partitionStatesForTopic : topicToPartitionStates.values()) { + final int partitionsThisTopic = partitionStatesForTopic.size(); + if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) { + throw new IllegalStateException("The specific topic names do not have the same number of partitions"); Review Comment: The topic names should be included in the message for troubleshooting. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/consumer/TestConsumerPartitionsUtil.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestConsumerPartitionsUtil { + private final ComponentLog logger = mock(); + private String hostname; + + @BeforeEach + public void setup() throws UnknownHostException { + hostname = InetAddress.getLocalHost().getHostName(); + } + + @Test + public void testNoPartitionAssignments() throws UnknownHostException { Review Comment: `UnknownHostException` does not appear to be thrown in this and other methods. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -389,12 +474,18 @@ public void onStopped() { while ((service = consumerServices.poll()) != null) { close(service, "Processor stopped"); } + + availablePartitionedPollingContexts.clear(); + consumerServiceToPartitionedPollingContext.clear(); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final KafkaConsumerService consumerService = getConsumerService(context); + final PollingContext pollingContext = Optional.ofNullable(consumerServiceToPartitionedPollingContext.get(consumerService)) Review Comment: Creating an `Optional` wrapper is not necessary, and does not add sufficient value for running on every invocation of `onTrigger`, so I recommend replacing with a standard conditional. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final PollingContext pollingContext = createPollingContext(context); - final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - try { - final List<PartitionState> partitionStates = consumerService.getPartitionStates(); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); - } catch (final Exception e) { - getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + final Collection<String> topics = pollingContext.getTopics(); + if (!topics.isEmpty()) { + try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + try { + final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList()); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); + } catch (final Exception e) { + getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); Review Comment: This is carried over from the previous implementation, but on review, the Verification Logger instance should be used instead of `getLogger()`. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -379,6 +437,33 @@ public void onScheduled(final ProcessContext context) { if (maxUncommittedSizeConfigured) { maxUncommittedSize = maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue(); } + + if (ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties())) { + final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); + final int partitionCount = getPartitionCount(connectionService); + + if (partitionCount != numAssignedPartitions) { + context.yield(); + + throw new ProcessException("Illegal Partition Assignment: There are " + + numAssignedPartitions + " partitions statically assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names," + + " but the Kafka topic(s) have " + partitionCount + " partitions"); + } + + final int[] assignedPartitions; + try { + assignedPartitions = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger()); + } catch (final UnknownHostException uhe) { + throw new ProcessException("Could not determine localhost's hostname", uhe); Review Comment: ```suggestion throw new ProcessException("Failed to resolve local host address", uhe); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final PollingContext pollingContext = createPollingContext(context); - final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - try { - final List<PartitionState> partitionStates = consumerService.getPartitionStates(); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); - } catch (final Exception e) { - getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + final Collection<String> topics = pollingContext.getTopics(); + if (!topics.isEmpty()) { + try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + try { + final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList()); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); + } catch (final Exception e) { + getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + } + verificationResults.add(verificationPartitions.build()); + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after verification.", e); + } } - verificationResults.add(verificationPartitions.build()); return verificationResults; } private KafkaConsumerService getConsumerService(final ProcessContext context) { + recreatePartitionedConsumerServices(); + final KafkaConsumerService consumerService = consumerServices.poll(); if (consumerService != null) { return consumerService; } - final int activeCount = activeConsumerCount.incrementAndGet(); - if (activeCount > getMaxConsumerCount()) { - getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); - activeConsumerCount.decrementAndGet(); + final boolean isExplicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()); + + if (isExplicitPartitionMapping) { + getLogger().trace("No Partitioned Kafka Consumer Service available, all specified partitions are being consumed from."); return null; + } else { + final int activeCount = activeConsumerCount.incrementAndGet(); + if (activeCount > getMaxConsumerCount()) { + getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); + activeConsumerCount.decrementAndGet(); + return null; + } + + getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); + return connectionService.getConsumerService(pollingContext); } + } - getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); - final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - return connectionService.getConsumerService(pollingContext); + private int getPartitionCount(final KafkaConnectionService connectionService) { + Collection<String> topics = this.pollingContext.getTopics(); + + if (topics.isEmpty()) { + return -1; + } + + int partitionsEachTopic = 0; + try (KafkaConsumerService kafkaConsumerService = connectionService.getConsumerService(this.pollingContext)) { + Map<String, List<PartitionState>> topicToPartitionStates = kafkaConsumerService.getPartitionStatesByTopic(); + for (List<PartitionState> partitionStatesForTopic : topicToPartitionStates.values()) { + final int partitionsThisTopic = partitionStatesForTopic.size(); + if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) { + throw new IllegalStateException("The specific topic names do not have the same number of partitions"); + } + + partitionsEachTopic = partitionsThisTopic; + } + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after partition assignment check.", e); Review Comment: It is best to avoid conjunctions and avoid ending with `.` characters in messages: ```suggestion getLogger().warn("Failed to close KafkaConsumerService after partition assignment check", e); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final PollingContext pollingContext = createPollingContext(context); - final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - try { - final List<PartitionState> partitionStates = consumerService.getPartitionStates(); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); - } catch (final Exception e) { - getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + final Collection<String> topics = pollingContext.getTopics(); + if (!topics.isEmpty()) { + try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + try { + final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList()); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); + } catch (final Exception e) { + getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + } + verificationResults.add(verificationPartitions.build()); + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after verification.", e); Review Comment: ```suggestion getLogger().warn("Failed to close KafkaConsumerService after verification", e); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -513,43 +609,95 @@ public List<ConfigVerificationResult> verify(final ProcessContext context, final final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final PollingContext pollingContext = createPollingContext(context); - final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - try { - final List<PartitionState> partitionStates = consumerService.getPartitionStates(); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); - } catch (final Exception e) { - getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + final Collection<String> topics = pollingContext.getTopics(); + if (!topics.isEmpty()) { + try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) { + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + try { + final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList()); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); + } catch (final Exception e) { + getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e)); + } + verificationResults.add(verificationPartitions.build()); + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after verification.", e); + } } - verificationResults.add(verificationPartitions.build()); return verificationResults; } private KafkaConsumerService getConsumerService(final ProcessContext context) { + recreatePartitionedConsumerServices(); + final KafkaConsumerService consumerService = consumerServices.poll(); if (consumerService != null) { return consumerService; } - final int activeCount = activeConsumerCount.incrementAndGet(); - if (activeCount > getMaxConsumerCount()) { - getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); - activeConsumerCount.decrementAndGet(); + final boolean isExplicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()); + + if (isExplicitPartitionMapping) { + getLogger().trace("No Partitioned Kafka Consumer Service available, all specified partitions are being consumed from."); return null; + } else { + final int activeCount = activeConsumerCount.incrementAndGet(); + if (activeCount > getMaxConsumerCount()) { + getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount()); + activeConsumerCount.decrementAndGet(); + return null; + } + + getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); + return connectionService.getConsumerService(pollingContext); } + } - getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount); - final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - return connectionService.getConsumerService(pollingContext); + private int getPartitionCount(final KafkaConnectionService connectionService) { + Collection<String> topics = this.pollingContext.getTopics(); + + if (topics.isEmpty()) { + return -1; + } + + int partitionsEachTopic = 0; + try (KafkaConsumerService kafkaConsumerService = connectionService.getConsumerService(this.pollingContext)) { + Map<String, List<PartitionState>> topicToPartitionStates = kafkaConsumerService.getPartitionStatesByTopic(); + for (List<PartitionState> partitionStatesForTopic : topicToPartitionStates.values()) { + final int partitionsThisTopic = partitionStatesForTopic.size(); + if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) { + throw new IllegalStateException("The specific topic names do not have the same number of partitions"); + } + + partitionsEachTopic = partitionsThisTopic; + } + } catch (IOException e) { + getLogger().warn("Couldn't close KafkaConsumerService after partition assignment check.", e); + } + + return partitionsEachTopic; + } + + private void recreatePartitionedConsumerServices() { + PollingContext partitionedPollingContext; + while ((partitionedPollingContext = availablePartitionedPollingContexts.poll()) != null) { + getLogger().info("Creating new Partitioned Kafka Consumer Service."); Review Comment: Some identifying information, such as the Partition, should be included in the log message. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -379,6 +437,33 @@ public void onScheduled(final ProcessContext context) { if (maxUncommittedSizeConfigured) { maxUncommittedSize = maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue(); } + + if (ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties())) { + final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties()); + final int partitionCount = getPartitionCount(connectionService); + + if (partitionCount != numAssignedPartitions) { + context.yield(); + + throw new ProcessException("Illegal Partition Assignment: There are " + + numAssignedPartitions + " partitions statically assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names," + + " but the Kafka topic(s) have " + partitionCount + " partitions"); + } Review Comment: Recommend using formatted strings instead of concatenation for this message. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/ConsumerPartitionsUtil.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.processors.consumer; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ConsumerPartitionsUtil { + public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions."; + + public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException { + final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties); + final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString); + + if (partitionsByHost.isEmpty()) { + // Explicit partitioning is not enabled. + logger.debug("No explicit Consumer Partitions have been declared."); + return null; + } + + logger.info("Found the following mapping of hosts to partitions: {}", hostnameToPartitionString); + + // Determine the partitions based on hostname/IP. + int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost); + if (partitionsForThisHost == null) { + throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName()); + } + + return partitionsForThisHost; + } + + private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) { + final Map<String, int[]> partitionsByHost = new HashMap<>(); + for (final Map.Entry<String, String> entry : partitionValues.entrySet()) { + final String host = entry.getKey(); + final int[] partitions = parsePartitions(host, entry.getValue()); + partitionsByHost.put(host, partitions); + } + + return partitionsByHost; + } + + private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException { + // Determine the partitions based on hostname/IP. + final InetAddress localhost = InetAddress.getLocalHost(); + int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName()); + if (partitionsForThisHost != null) { + return partitionsForThisHost; + } + + partitionsForThisHost = partitionsByHost.get(localhost.getHostName()); + if (partitionsForThisHost != null) { + return partitionsForThisHost; + } + + return partitionsByHost.get(localhost.getHostAddress()); + } + + private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) { + final Map<String, String> hostnameToPartitionString = new HashMap<>(); + for (final Map.Entry<String, String> entry : properties.entrySet()) { + final String propertyName = entry.getKey(); + if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) { + continue; + } + + if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) { + continue; + } + + final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length()); + hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue()); + } + + return hostnameToPartitionString; + } + + private static int[] parsePartitions(final String hostname, final String propertyValue) { + final String[] splits = propertyValue.split(","); + final List<Integer> partitionList = new ArrayList<>(); + for (final String split : splits) { + if (split.isBlank()) { + continue; + } + + try { + final int partition = Integer.parseInt(split.trim()); + if (partition < 0) { + throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative"); + } + + partitionList.add(partition); + } catch (final NumberFormatException nfe) { + throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer"); + } + } + + // Map out List<Integer> to int[] + return partitionList.stream().mapToInt(Integer::intValue).toArray(); + } + + public static ValidationResult validateConsumePartitions(final Map<String, String> properties) { + final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties); + if (hostnameToPartitionMapping.isEmpty()) { + // Partitions are not being explicitly assigned. + return new ValidationResult.Builder().valid(true).build(); + } + + final Set<Integer> partitionsClaimed = new HashSet<>(); + final Set<Integer> duplicatePartitions = new HashSet<>(); + for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) { + final int[] partitions = parsePartitions(entry.getKey(), entry.getValue()); + for (final int partition : partitions) { + final boolean added = partitionsClaimed.add(partition); + if (!added) { + duplicatePartitions.add(partition); + } + } + } + + final List<Integer> partitionsMissing = new ArrayList<>(); + for (int i = 0; i < partitionsClaimed.size(); i++) { + if (!partitionsClaimed.contains(i)) { + partitionsMissing.add(i); + } + } + + if (!partitionsMissing.isEmpty()) { + return new ValidationResult.Builder() + .subject("Partitions") + .input(partitionsClaimed.toString()) + .valid(false) + .explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString()) + .build(); + } + + if (!duplicatePartitions.isEmpty()) { + return new ValidationResult.Builder() + .subject("Partitions") + .input(partitionsClaimed.toString()) + .valid(false) + .explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString()) + .build(); + } + + final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping); + final int[] partitionsForThisHost; + try { + partitionsForThisHost = getPartitionsForThisHost(partitionsByHost); + } catch (UnknownHostException e) { + return new ValidationResult.Builder() + .valid(false) + .subject("Partition Assignment") + .explanation("Unable to determine hostname of localhost") + .build(); + } + + if (partitionsForThisHost == null) { + return new ValidationResult.Builder() + .subject("Partition Assignment") + .valid(false) + .explanation("No assignment was given for this host") + .build(); + } + + return new ValidationResult.Builder().valid(true).build(); + } + + public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) { + final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties); + return !hostnameToPartitionMapping.isEmpty(); + } + + public static int getPartitionAssignmentCount(final Map<String, String> properties) { Review Comment: Recommend moving all public methods before all private methods in this class. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ########## @@ -617,6 +765,10 @@ private void processInputFlowFile(final ProcessSession session, final OffsetTrac } private PollingContext createPollingContext(final ProcessContext context) { + return createPollingContext(context, null); + } + + private PollingContext createPollingContext(final ProcessContext context, Integer partition) { Review Comment: ```suggestion private PollingContext createPollingContext(final ProcessContext context, final Integer partition) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
