In my case, all the NIFI nodes functioned well during the replication so I
guess this is not my case, but it's a good point to consider.

Thanks Phillip

On Wed, Dec 13, 2023 at 4:20 PM Phillip Lord <phillord0...@gmail.com> wrote:

> Perhaps try following this guidance in the docs???
>
> Consumer Partition Assignment
>
> By default, this processor will subscribe to one or more Kafka topics in
> such a way that the topics to consume from are randomly assigned to the
> nodes in the NiFi cluster. Consider a scenario where a single Kafka topic
> has 8 partitions and the consuming NiFi cluster has 3 nodes. In this
> scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be
> assigned partitions 3, 4, and 5. Node 3 will then be assigned partitions 6
> and 7.
>
> In this scenario, if Node 3 somehow fails or stops pulling data from
> Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
> For most use cases, this is desirable. It provides fault tolerance and
> allows the remaining nodes to pick up the slack. However, there are cases
> where this is undesirable.
>
> One such case is when using NiFi to consume Change Data Capture (CDC) data
> from Kafka. Consider again the above scenario. Consider that Node 3 has
> pulled 1,000 messages from Kafka but has not yet delivered them to their
> final destination. NiFi is then stopped and restarted, and that takes 15
> minutes to complete. In the meantime, Partitions 6 and 7 have been
> reassigned to the other nodes. Those nodes then proceeded to pull data from
> Kafka and deliver it to the desired destination. After 15 minutes, Node 3
> rejoins the cluster and then continues to deliver its 1,000 messages that
> it has already pulled from Kafka to the destination system. Now, those
> records have been delivered out of order.
>
> The solution for this, then, is to assign partitions statically instead of
> dynamically. In this way, we can assign Partitions 6 and 7 to Node 3
> specifically. Then, if Node 3 is restarted, the other nodes will not pull
> data from Partitions 6 and 7. The data will remain queued in Kafka until
> Node 3 is restarted. By using this approach, we can ensure that the data
> that already was pulled can be processed (assuming First In First Out
> Prioritizers are used) before newer messages are handled.
>
> In order to provide a static mapping of node to Kafka partition(s), one or
> more user-defined properties must be added using the naming scheme
> partitions.<hostname> with the value being a comma-separated list of
> Kafka partitions to use. For example, partitions.nifi-01=0, 3, 6, 9, 
> partitions.nifi-02=1,
> 4, 7, 10, and partitions.nifi-03=2, 5, 8, 11. The hostname that is used
> can be the fully qualified hostname, the "simple" hostname, or the IP
> address. There must be an entry for each node in the cluster, or the
> Processor will become invalid. If it is desirable for a node to not have
> any partitions assigned to it, a Property may be added for the hostname
> with an empty string as the value.
>
> NiFi cannot readily validate that all Partitions have been assigned before
> the Processor is scheduled to run. However, it can validate that no
> partitions have been skipped. As such, if partitions 0, 1, and 3 are
> assigned but not partition 2, the Processor will not be valid. However, if
> partitions 0, 1, and 2 are assigned, the Processor will become valid, even
> if there are 4 partitions on the Topic. When the Processor is started, the
> Processor will immediately start to fail, logging errors, and avoid pulling
> any data until the Processor is updated to account for all partitions. Once
> running, if the number of partitions is changed, the Processor will
> continue to run but not pull data from the newly added partitions. Once
> stopped, it will begin to error until all partitions have been assigned.
> Additionally, if partitions that are assigned do not exist (e.g.,
> partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only
> 4 partitions), then the Processor will begin to log errors on startup and
> will not pull data.
>
> In order to use a static mapping of Kafka partitions, the "Topic Name
> Format" must be set to "names" rather than "pattern." Additionally, all
> Topics that are to be consumed must have the same number of partitions. If
> multiple Topics are to be consumed and have a different number of
> partitions, multiple Processors must be used so that each Processor
> consumes only from Topics with the same number of partitions.
>
> On Dec 13, 2023 at 8:59 AM -0500, edi mari <edim2...@gmail.com>, wrote:
>
> Hi Pierre,
> Yes, We tried to use FIFO prioritize in the queue, but it didn't help.
> Some records in the target topic are ordered differently from the source
> topic(which is critical in cleanup policy: compact) .
>
> Edi
>
> On Wed, Dec 13, 2023 at 3:46 PM Pierre Villard <
> pierre.villard...@gmail.com> wrote:
>
>> Hi Edi,
>>
>> Did you try setting the FIFO prioritizer on the connection between the
>> processors?
>>
>> Thanks,
>> Pierre
>>
>> Le mer. 13 déc. 2023 à 14:19, edi mari <edim2...@gmail.com> a écrit :
>>
>>>
>>> Hello ,
>>> I'm using NIFI v1.20.0 to replicate 250 million messages between Kafka
>>> topics.
>>> The problem is that NIFI replicates messages in a non-sequential order,
>>> resulting in the destination topic storing messages differently than the
>>> source topic.
>>>
>>> for example
>>> *source topic - partition 0*
>>> offset:5 key:a value:v1
>>> offset:6 key:a value:v2
>>> offset:7 key:a value:v3
>>>
>>> *destination topic - partition 0*
>>> offset:5 key:a value:v2
>>> offset:6 key:a value:v1
>>> offset:7 key:a value:v3
>>>
>>> The topics are configured with a cleanup policy: compact.
>>>
>>> I'm using ConsumeKafka and PublishKafka processors to replicate topics.
>>>
>>> <image.png>
>>> <image.png>
>>> <image.png>
>>> <image.png>
>>> <image.png>
>>> Thanks
>>> Edi
>>>
>>

Reply via email to