This is an automated email from the ASF dual-hosted git repository. cbornet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git
The following commit(s) were added to refs/heads/master by this push: new f7f4a9b Remove dependency on org.apache.pulsar.functions.utils.FunctionCommon in kafka-compat (#44) f7f4a9b is described below commit f7f4a9b4ca11037b7235e0522847da053e5c74a9 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Thu Mar 2 10:51:04 2023 -0400 Remove dependency on org.apache.pulsar.functions.utils.FunctionCommon in kafka-compat (#44) This class is part of pulsar-functions and is built with a more recent JDK than the clients --- .../apache/kafka/clients/admin/PulsarKafkaAdminClient.java | 13 +++++++++++-- .../kafka/clients/consumer/PulsarConsumerCoordinator.java | 2 -- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java index 8196a39..7d3f328 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/admin/PulsarKafkaAdminClient.java @@ -35,10 +35,12 @@ import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.functions.utils.FunctionCommon; import java.time.Duration; import java.util.Collection; @@ -155,7 +157,7 @@ public class PulsarKafkaAdminClient implements Admin { .getLastMessageIdAsync(topicName) .whenComplete((msgId, ex) -> { if (ex == null) { - long offset = FunctionCommon.getSequenceId(msgId); + long offset = getSequenceId(msgId); future.complete(new ListOffsetsResult.ListOffsetsResultInfo( offset, System.currentTimeMillis(), @@ -169,6 +171,13 @@ public class PulsarKafkaAdminClient implements Admin { return new ListOffsetsResult(new HashMap<>(futures)); } + private static long getSequenceId(MessageId messageId) { + MessageIdImpl msgId = (MessageIdImpl)(messageId instanceof TopicMessageIdImpl ? ((TopicMessageIdImpl)messageId).getInnerMessageId() : messageId); + long ledgerId = msgId.getLedgerId(); + long entryId = msgId.getEntryId(); + return ledgerId << 28 | entryId; + } + @Override public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> map, DeleteRecordsOptions deleteRecordsOptions) { final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures = diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java index 81f7c49..4e7bd18 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarConsumerCoordinator.java @@ -18,10 +18,8 @@ */ package org.apache.kafka.clients.consumer; -import com.google.api.client.util.Maps; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter; import org.apache.kafka.common.TopicPartition;