This is an automated email from the ASF dual-hosted git repository.

rmani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new a54618087 RANGER-4349: AtlasTagSource.commitToKafka() should commit 
the offset to the topic from which the event came from.
a54618087 is described below

commit a5461808742ef0f40410f326fdf236619e81ca4f
Author: szymonorz <szy...@proton.me>
AuthorDate: Wed Aug 9 21:57:27 2023 +0200

    RANGER-4349: AtlasTagSource.commitToKafka() should commit the offset to the 
topic from which the event came from.
    
    Signed-off-by: szymonorz <szy...@proton.me>
    Signed-off-by: Ramesh Mani <rm...@apache.org>
---
 .../java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 071f52c4a..9a761b33c 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -316,7 +316,7 @@ public class AtlasTagSource extends AbstractTagSource {
                        int  partitionId   = messageToCommit.getPartition();
 
                        if (offsetOfLastMessageCommittedToKafka < 
messageOffset) {
-                               TopicPartition partition = new 
TopicPartition("ATLAS_ENTITIES", partitionId);
+                               TopicPartition partition = new 
TopicPartition(messageToCommit.getTopic(), partitionId);
                                try {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Committing message 
with offset:[" + messageOffset + "] to Kafka");

Reply via email to