[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623476#comment-16623476 ]
ASF GitHub Bot commented on NIFI-4914: -------------------------------------- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218858965 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor<T> extends AbstractProcessor { + + static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); + static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); + static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " + + "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + + static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", + "Mark the message as consumed despite being unable to decrypt the contents"); + static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", + "Discard the message and don't perform any addtional processing on the message"); + static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", + "Report a failure condition, and the route the message contents to the FAILED relationship."); --- End diff -- typo: "and then route the message content" > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > ------------------------------------------------------------------------------------------ > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.6.0 > Reporter: David Kjerrumgaard > Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)