This is an automated email from the ASF dual-hosted git repository. rmani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new a54618087 RANGER-4349: AtlasTagSource.commitToKafka() should commit the offset to the topic from which the event came from. a54618087 is described below commit a5461808742ef0f40410f326fdf236619e81ca4f Author: szymonorz <szy...@proton.me> AuthorDate: Wed Aug 9 21:57:27 2023 +0200 RANGER-4349: AtlasTagSource.commitToKafka() should commit the offset to the topic from which the event came from. Signed-off-by: szymonorz <szy...@proton.me> Signed-off-by: Ramesh Mani <rm...@apache.org> --- .../java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 071f52c4a..9a761b33c 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -316,7 +316,7 @@ public class AtlasTagSource extends AbstractTagSource { int partitionId = messageToCommit.getPartition(); if (offsetOfLastMessageCommittedToKafka < messageOffset) { - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", partitionId); + TopicPartition partition = new TopicPartition(messageToCommit.getTopic(), partitionId); try { if (LOG.isDebugEnabled()) { LOG.debug("Committing message with offset:[" + messageOffset + "] to Kafka");