In my case, all the NIFI nodes functioned well during the replication so I guess this is not my case, but it's a good point to consider.
Thanks Phillip On Wed, Dec 13, 2023 at 4:20 PM Phillip Lord <phillord0...@gmail.com> wrote: > Perhaps try following this guidance in the docs??? > > Consumer Partition Assignment > > By default, this processor will subscribe to one or more Kafka topics in > such a way that the topics to consume from are randomly assigned to the > nodes in the NiFi cluster. Consider a scenario where a single Kafka topic > has 8 partitions and the consuming NiFi cluster has 3 nodes. In this > scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be > assigned partitions 3, 4, and 5. Node 3 will then be assigned partitions 6 > and 7. > > In this scenario, if Node 3 somehow fails or stops pulling data from > Kafka, partitions 6 and 7 may then be reassigned to the other two nodes. > For most use cases, this is desirable. It provides fault tolerance and > allows the remaining nodes to pick up the slack. However, there are cases > where this is undesirable. > > One such case is when using NiFi to consume Change Data Capture (CDC) data > from Kafka. Consider again the above scenario. Consider that Node 3 has > pulled 1,000 messages from Kafka but has not yet delivered them to their > final destination. NiFi is then stopped and restarted, and that takes 15 > minutes to complete. In the meantime, Partitions 6 and 7 have been > reassigned to the other nodes. Those nodes then proceeded to pull data from > Kafka and deliver it to the desired destination. After 15 minutes, Node 3 > rejoins the cluster and then continues to deliver its 1,000 messages that > it has already pulled from Kafka to the destination system. Now, those > records have been delivered out of order. > > The solution for this, then, is to assign partitions statically instead of > dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 > specifically. Then, if Node 3 is restarted, the other nodes will not pull > data from Partitions 6 and 7. The data will remain queued in Kafka until > Node 3 is restarted. By using this approach, we can ensure that the data > that already was pulled can be processed (assuming First In First Out > Prioritizers are used) before newer messages are handled. > > In order to provide a static mapping of node to Kafka partition(s), one or > more user-defined properties must be added using the naming scheme > partitions.<hostname> with the value being a comma-separated list of > Kafka partitions to use. For example, partitions.nifi-01=0, 3, 6, 9, > partitions.nifi-02=1, > 4, 7, 10, and partitions.nifi-03=2, 5, 8, 11. The hostname that is used > can be the fully qualified hostname, the "simple" hostname, or the IP > address. There must be an entry for each node in the cluster, or the > Processor will become invalid. If it is desirable for a node to not have > any partitions assigned to it, a Property may be added for the hostname > with an empty string as the value. > > NiFi cannot readily validate that all Partitions have been assigned before > the Processor is scheduled to run. However, it can validate that no > partitions have been skipped. As such, if partitions 0, 1, and 3 are > assigned but not partition 2, the Processor will not be valid. However, if > partitions 0, 1, and 2 are assigned, the Processor will become valid, even > if there are 4 partitions on the Topic. When the Processor is started, the > Processor will immediately start to fail, logging errors, and avoid pulling > any data until the Processor is updated to account for all partitions. Once > running, if the number of partitions is changed, the Processor will > continue to run but not pull data from the newly added partitions. Once > stopped, it will begin to error until all partitions have been assigned. > Additionally, if partitions that are assigned do not exist (e.g., > partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only > 4 partitions), then the Processor will begin to log errors on startup and > will not pull data. > > In order to use a static mapping of Kafka partitions, the "Topic Name > Format" must be set to "names" rather than "pattern." Additionally, all > Topics that are to be consumed must have the same number of partitions. If > multiple Topics are to be consumed and have a different number of > partitions, multiple Processors must be used so that each Processor > consumes only from Topics with the same number of partitions. > > On Dec 13, 2023 at 8:59 AM -0500, edi mari <edim2...@gmail.com>, wrote: > > Hi Pierre, > Yes, We tried to use FIFO prioritize in the queue, but it didn't help. > Some records in the target topic are ordered differently from the source > topic(which is critical in cleanup policy: compact) . > > Edi > > On Wed, Dec 13, 2023 at 3:46 PM Pierre Villard < > pierre.villard...@gmail.com> wrote: > >> Hi Edi, >> >> Did you try setting the FIFO prioritizer on the connection between the >> processors? >> >> Thanks, >> Pierre >> >> Le mer. 13 déc. 2023 à 14:19, edi mari <edim2...@gmail.com> a écrit : >> >>> >>> Hello , >>> I'm using NIFI v1.20.0 to replicate 250 million messages between Kafka >>> topics. >>> The problem is that NIFI replicates messages in a non-sequential order, >>> resulting in the destination topic storing messages differently than the >>> source topic. >>> >>> for example >>> *source topic - partition 0* >>> offset:5 key:a value:v1 >>> offset:6 key:a value:v2 >>> offset:7 key:a value:v3 >>> >>> *destination topic - partition 0* >>> offset:5 key:a value:v2 >>> offset:6 key:a value:v1 >>> offset:7 key:a value:v3 >>> >>> The topics are configured with a cleanup policy: compact. >>> >>> I'm using ConsumeKafka and PublishKafka processors to replicate topics. >>> >>> <image.png> >>> <image.png> >>> <image.png> >>> <image.png> >>> <image.png> >>> Thanks >>> Edi >>> >>