[ https://issues.apache.org/jira/browse/NIFI-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15549945#comment-15549945 ]
ASF GitHub Bot commented on NIFI-2865: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1097#discussion_r82070354 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java --- @@ -250,242 +241,141 @@ @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; + return PROPERTIES; } @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true) - .build(); + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) + .dynamic(true) + .build(); } @Override protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { return KafkaProcessorUtils.validateCommonProperties(validationContext); } - volatile KafkaPublisher kafkaPublisher; - - /** - * This thread-safe operation will delegate to - * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first - * checking and creating (if necessary) Kafka resource which could be either - * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and - * destroy the underlying Kafka resource upon catching an {@link Exception} - * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}. - * After Kafka resource is destroyed it will be re-created upon the next - * invocation of this operation essentially providing a self healing - * mechanism to deal with potentially corrupted resource. - * <p> - * Keep in mind that upon catching an exception the state of this processor - * will be set to no longer accept any more tasks, until Kafka resource is - * reset. This means that in a multi-threaded situation currently executing - * tasks will be given a chance to complete while no new tasks will be - * accepted. - * - * @param context context - * @param sessionFactory factory - */ - @Override - public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted. - this.taskCounter.incrementAndGet(); - final ProcessSession session = sessionFactory.createSession(); - try { - /* - * We can't be doing double null check here since as a pattern - * it only works for lazy init but not reset, which is what we - * are doing here. In fact the first null check is dangerous - * since 'kafkaPublisher' can become null right after its null - * check passed causing subsequent NPE. - */ - synchronized (this) { - if (this.kafkaPublisher == null) { - this.kafkaPublisher = this.buildKafkaResource(context, session); - } - } - - /* - * The 'processed' boolean flag does not imply any failure or success. It simply states that: - * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated - * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile - */ - boolean processed = this.rendezvousWithKafka(context, session); - session.commit(); - if (!processed) { - context.yield(); - } - } catch (Throwable e) { - this.acceptTask = false; - session.rollback(true); - this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e}); - } finally { - synchronized (this) { - if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { - this.close(); - this.acceptTask = true; - } - } - } - } else { - this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - context.yield(); + private synchronized PublisherPool getPublisherPool(final ProcessContext context) { + PublisherPool pool = publisherPool; + if (pool != null) { + return pool; } + + return publisherPool = createPublisherPool(context); + } + + protected PublisherPool createPublisherPool(final ProcessContext context) { + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + final Map<String, Object> kafkaProperties = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize)); + + return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis); } - /** - * Will call {@link Closeable#close()} on the target resource after which - * the target resource will be set to null. Should only be called when there - * are no more threads being executed on this processor or when it has been - * verified that only a single thread remains. - * - * @see KafkaPublisher - * @see KafkaConsumer - */ @OnStopped - public void close() { - try { - if (this.kafkaPublisher != null) { - try { - this.kafkaPublisher.close(); - } catch (Exception e) { - this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e); - } - } - } finally { - this.kafkaPublisher = null; + public void closePool() { + if (publisherPool != null) { + publisherPool.close(); } + + publisherPool = null; } - /** - * Will rendezvous with Kafka if {@link ProcessSession} contains - * {@link FlowFile} producing a result {@link FlowFile}. - * <br> - * The result {@link FlowFile} that is successful is then transfered to - * {@link #REL_SUCCESS} - * <br> - * The result {@link FlowFile} that is failed is then transfered to - * {@link #REL_FAILURE} - * - */ - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile != null) { - long start = System.nanoTime(); - flowFile = this.doRendezvousWithKafka(flowFile, context, session); - Relationship relationship = REL_SUCCESS; - if (!this.isFailedFlowFile(flowFile)) { - String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic); - session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration); - this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis", - new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration}); - } else { - relationship = REL_FAILURE; - flowFile = session.penalize(flowFile); - } - session.transfer(flowFile, relationship); + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet(); + + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500)); + if (flowFiles.isEmpty()) { + return; } - return flowFile != null; - } - /** - * Builds and instance of {@link KafkaPublisher}. - */ - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { - final Map<String, String> kafkaProps = new HashMap<>(); - KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue())); - this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - final Properties props = new Properties(); - props.putAll(kafkaProps); - KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger()); - return publisher; - } + final PublisherPool pool = getPublisherPool(context); + if (pool == null) { + context.yield(); + return; + } - /** - * Will rendezvous with {@link KafkaPublisher} after building - * {@link PublishingContext} and will produce the resulting - * {@link FlowFile}. The resulting FlowFile contains all required - * information to determine if message publishing originated from the - * provided FlowFile has actually succeeded fully, partially or failed - * completely (see {@link #isFailedFlowFile(FlowFile)}. - */ - private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { - final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream contentStream) throws IOException { - PublishingContext publishingContext = PublishKafka.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisher.KafkaPublisherResult result = PublishKafka.this.kafkaPublisher.publish(publishingContext); - publishResultRef.set(result); + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + + final long startTime = System.nanoTime(); + try (final PublisherLease lease = pool.obtainPublisher()) { + // Send each FlowFile to Kafka asynchronously. + for (final FlowFile flowFile : flowFiles) { + if (!isScheduled()) { + // If stopped, re-queue FlowFile instead of sending it + session.transfer(flowFile); + continue; + } + + final byte[] messageKey = getMessageKey(flowFile, context); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] demarcatorBytes; + if (useDemarcator) { + demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8); + } else { + demarcatorBytes = null; + } + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + lease.publish(flowFile, in, messageKey, demarcatorBytes, topic); + } + } + }); } - }); - FlowFile resultFile = publishResultRef.get().isAllAcked() - ? this.cleanUpFlowFileIfNecessary(flowFile, session) - : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + // Complete the send + final PublishResult publishResult = lease.complete(); - if (!this.isFailedFlowFile(resultFile)) { - resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent())); - } - return resultFile; - } + // Transfer any successful FlowFiles. + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + for (FlowFile success : publishResult.getSuccessfulFlowFiles()) { + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue(); - /** - * Builds {@link PublishingContext} for message(s) to be sent to Kafka. - * {@link PublishingContext} contains all contextual information required by - * {@link KafkaPublisher} to publish to Kafka. Such information contains - * things like topic name, content stream, delimiter, key and last ACKed - * message for cases where provided FlowFile is being retried (failed in the - * past). - * <br> - * For the clean FlowFile (file that has been sent for the first time), - * PublishingContext will be built form {@link ProcessContext} associated - * with this invocation. - * <br> - * For the failed FlowFile, {@link PublishingContext} will be built from - * attributes of that FlowFile which by then will already contain required - * information (e.g., topic, key, delimiter etc.). This is required to - * ensure the affinity of the retry in the even where processor - * configuration has changed. However keep in mind that failed FlowFile is - * only considered a failed FlowFile if it is being re-processed by the same - * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see - * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to - * another PublishKafka0_10 processor it is treated as a fresh FlowFile - * regardless if it has #FAILED* attributes set. - */ - private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { - final byte[] keyBytes = getMessageKey(flowFile, context); - - final String topicName; - final byte[] delimiterBytes; - int lastAckedMessageIndex = -1; - if (this.isFailedFlowFile(flowFile)) { - lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); - topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null - ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - } else { - topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) - .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; - } + final int msgCount = publishResult.getSuccessfulMessageCount(success); + success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount)); + session.adjustCounter("Messages Sent", msgCount, true); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic); + session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis); + session.transfer(success, REL_SUCCESS); + } + + // Transfer any failures. + for (final FlowFile failure : publishResult.getFailedFlowFiles()) { + final int successCount = publishResult.getSuccessfulMessageCount(failure); + if (successCount > 0) { + getLogger().error("Failed to send all message for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}", --- End diff -- Same as other comment in the PublishKafka_0_10 > Address issues of PublishKafka blocking when having trouble communicating > with Kafka broker and improve performance > ------------------------------------------------------------------------------------------------------------------- > > Key: NIFI-2865 > URL: https://issues.apache.org/jira/browse/NIFI-2865 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.1.0 > > > When NiFi is unable to communicate properly with the Kafka broker, we see the > NiFi threads occasionally block. This should be resolvable by calling the > wakeup() method of the client. Additionally, if Kafka takes too long to > respond, we should be able to route the FlowFile to failure and move on. > PublishKafka has a nice feature that allows a demarcated stream to be sent as > separate messages, so that a large number of messages can be sent as a single > FlowFile. However, in the case of individual messages per FlowFile, the > performance could be improved by batching together multiple FlowFiles per > session -- This message was sent by Atlassian JIRA (v6.3.4#6332)