This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f4a9b  Remove dependency on 
org.apache.pulsar.functions.utils.FunctionCommon in kafka-compat (#44)
f7f4a9b is described below

commit f7f4a9b4ca11037b7235e0522847da053e5c74a9
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Thu Mar 2 10:51:04 2023 -0400

    Remove dependency on org.apache.pulsar.functions.utils.FunctionCommon in 
kafka-compat (#44)
    
    This class is part of pulsar-functions and is built with a more recent JDK 
than the clients
---
 .../apache/kafka/clients/admin/PulsarKafkaAdminClient.java  | 13 +++++++++++--
 .../kafka/clients/consumer/PulsarConsumerCoordinator.java   |  2 --
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
index 8196a39..7d3f328 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java
@@ -35,10 +35,12 @@ import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -155,7 +157,7 @@ public class PulsarKafkaAdminClient implements Admin {
                         .getLastMessageIdAsync(topicName)
                         .whenComplete((msgId, ex) -> {
                             if (ex == null) {
-                                long offset = 
FunctionCommon.getSequenceId(msgId);
+                                long offset = getSequenceId(msgId);
                                 future.complete(new 
ListOffsetsResult.ListOffsetsResultInfo(
                                         offset,
                                         System.currentTimeMillis(),
@@ -169,6 +171,13 @@ public class PulsarKafkaAdminClient implements Admin {
         return new ListOffsetsResult(new HashMap<>(futures));
     }
 
+    private static long getSequenceId(MessageId messageId) {
+        MessageIdImpl msgId = (MessageIdImpl)(messageId instanceof 
TopicMessageIdImpl ? ((TopicMessageIdImpl)messageId).getInnerMessageId() : 
messageId);
+        long ledgerId = msgId.getLedgerId();
+        long entryId = msgId.getEntryId();
+        return ledgerId << 28 | entryId;
+    }
+
     @Override
     public DeleteRecordsResult deleteRecords(Map<TopicPartition, 
RecordsToDelete> map, DeleteRecordsOptions deleteRecordsOptions) {
         final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures =
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
index 81f7c49..4e7bd18 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java
@@ -18,10 +18,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import com.google.api.client.util.Maps;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
 import org.apache.kafka.common.TopicPartition;
 

Reply via email to