sijie commented on a change in pull request #9927: URL: https://github.com/apache/pulsar/pull/9927#discussion_r595714308
########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null); + defaultValueSchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null); + + Class<?> clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SinkConnector) clazz.getConstructor().newInstance(); + + Class<? extends Task> taskClass = connector.taskClass(); + sinkContext = new PulsarKafkaSinkContext(); + connector.initialize(sinkContext); + connector.start(Maps.fromProperties(props)); + + List<Map<String, String>> configs = connector.taskConfigs(1); + configs.forEach(x -> { + x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); + x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); + }); + task = (SinkTask) taskClass.getConstructor().newInstance(); + taskContext = + new PulsarKafkaSinkTaskContext(configs.get(0), task::open); + task.initialize(taskContext); + task.start(configs.get(0)); + + batchSize = kafkaSinkConfig.getBatchSize(); + lingerMs = kafkaSinkConfig.getLingerTimeMs(); + scheduledExecutor.scheduleAtFixedRate(() -> + this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS); + + isRunning = true; + log.info("Kafka sink started : {}.", props); + } + + private void flushIfNeeded(boolean force) { + if (force || numPendingRecords.get() >= batchSize) { + scheduledExecutor.submit(this::flush); + } + } + + public void flush() { + if (log.isDebugEnabled()) { + log.debug("flush requested, pending: {}, batchSize: {}", + numPendingRecords.get(), batchSize); + } + + if (numPendingRecords.getAndSet(0) == 0) { + return; + } + + Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets(); + CompletableFuture<Void> flushCf; + synchronized (this) { + flushCf = pendingFlush; + pendingFlush = new CompletableFuture<>(); + } + + try { + task.flush(currentOffsets); + taskContext.flushOffsets(currentOffsets); + flushCf.complete(null); + } catch (Throwable t) { + log.error("error flushing pending records", t); + flushCf.completeExceptionally(t); + } + } + + /** + * org.apache.kafka.connect.data.Schema for the object + * @param obj + * @return org.apache.kafka.connect.data.Schema + */ + public static Schema getKafkaConnectSchemaForObject(Object obj, Schema defaultSchema) { + if (obj == null) { + return defaultSchema; + } + + if (primitiveTypeToSchema.containsKey(obj.getClass())) { + return primitiveTypeToSchema.get(obj.getClass()); + } + + // Other types are not supported yet. + // Will fallback to defaults provided. + return defaultSchema; + } + + private SinkRecord toSinkRecord(Record<Object> sourceRecord) { + final int partition = 0; Review comment: you need to specify the partition. You can get partition id from `Record.getPartitionId`. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null); + defaultValueSchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null); + + Class<?> clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SinkConnector) clazz.getConstructor().newInstance(); + + Class<? extends Task> taskClass = connector.taskClass(); + sinkContext = new PulsarKafkaSinkContext(); + connector.initialize(sinkContext); + connector.start(Maps.fromProperties(props)); + + List<Map<String, String>> configs = connector.taskConfigs(1); + configs.forEach(x -> { + x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); + x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); + }); + task = (SinkTask) taskClass.getConstructor().newInstance(); + taskContext = + new PulsarKafkaSinkTaskContext(configs.get(0), task::open); + task.initialize(taskContext); + task.start(configs.get(0)); + + batchSize = kafkaSinkConfig.getBatchSize(); + lingerMs = kafkaSinkConfig.getLingerTimeMs(); + scheduledExecutor.scheduleAtFixedRate(() -> + this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS); + + isRunning = true; + log.info("Kafka sink started : {}.", props); + } + + private void flushIfNeeded(boolean force) { + if (force || numPendingRecords.get() >= batchSize) { + scheduledExecutor.submit(this::flush); + } + } + + public void flush() { + if (log.isDebugEnabled()) { + log.debug("flush requested, pending: {}, batchSize: {}", + numPendingRecords.get(), batchSize); + } + + if (numPendingRecords.getAndSet(0) == 0) { + return; + } + + Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets(); + CompletableFuture<Void> flushCf; + synchronized (this) { + flushCf = pendingFlush; + pendingFlush = new CompletableFuture<>(); + } + + try { + task.flush(currentOffsets); + taskContext.flushOffsets(currentOffsets); + flushCf.complete(null); + } catch (Throwable t) { + log.error("error flushing pending records", t); + flushCf.completeExceptionally(t); + } + } + + /** + * org.apache.kafka.connect.data.Schema for the object + * @param obj + * @return org.apache.kafka.connect.data.Schema + */ + public static Schema getKafkaConnectSchemaForObject(Object obj, Schema defaultSchema) { + if (obj == null) { + return defaultSchema; + } + + if (primitiveTypeToSchema.containsKey(obj.getClass())) { + return primitiveTypeToSchema.get(obj.getClass()); + } + + // Other types are not supported yet. + // Will fallback to defaults provided. + return defaultSchema; + } + + private SinkRecord toSinkRecord(Record<Object> sourceRecord) { + final int partition = 0; + final Object key; + final Object value; + if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof KeyValue) { Review comment: You might also need to another branch to check if `Record` is a `KVRecord`. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null); + defaultValueSchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null); + + Class<?> clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SinkConnector) clazz.getConstructor().newInstance(); + + Class<? extends Task> taskClass = connector.taskClass(); + sinkContext = new PulsarKafkaSinkContext(); + connector.initialize(sinkContext); + connector.start(Maps.fromProperties(props)); + + List<Map<String, String>> configs = connector.taskConfigs(1); + configs.forEach(x -> { + x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); + x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); + }); + task = (SinkTask) taskClass.getConstructor().newInstance(); + taskContext = + new PulsarKafkaSinkTaskContext(configs.get(0), task::open); + task.initialize(taskContext); + task.start(configs.get(0)); + + batchSize = kafkaSinkConfig.getBatchSize(); + lingerMs = kafkaSinkConfig.getLingerTimeMs(); + scheduledExecutor.scheduleAtFixedRate(() -> + this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS); + + isRunning = true; + log.info("Kafka sink started : {}.", props); + } + + private void flushIfNeeded(boolean force) { + if (force || numPendingRecords.get() >= batchSize) { + scheduledExecutor.submit(this::flush); + } + } + + public void flush() { + if (log.isDebugEnabled()) { + log.debug("flush requested, pending: {}, batchSize: {}", + numPendingRecords.get(), batchSize); + } + + if (numPendingRecords.getAndSet(0) == 0) { + return; + } + + Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets(); + CompletableFuture<Void> flushCf; + synchronized (this) { + flushCf = pendingFlush; + pendingFlush = new CompletableFuture<>(); + } + + try { + task.flush(currentOffsets); + taskContext.flushOffsets(currentOffsets); + flushCf.complete(null); + } catch (Throwable t) { + log.error("error flushing pending records", t); + flushCf.completeExceptionally(t); + } + } + + /** + * org.apache.kafka.connect.data.Schema for the object + * @param obj + * @return org.apache.kafka.connect.data.Schema + */ + public static Schema getKafkaConnectSchemaForObject(Object obj, Schema defaultSchema) { + if (obj == null) { + return defaultSchema; + } + + if (primitiveTypeToSchema.containsKey(obj.getClass())) { + return primitiveTypeToSchema.get(obj.getClass()); + } + + // Other types are not supported yet. + // Will fallback to defaults provided. + return defaultSchema; + } + + private SinkRecord toSinkRecord(Record<Object> sourceRecord) { + final int partition = 0; + final Object key; + final Object value; + if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof KeyValue) { + KeyValue<Object, Object> kv = (KeyValue<Object, Object>) sourceRecord.getValue(); + key = kv.getKey(); + value = kv.getValue(); + } else { + key = sourceRecord.getKey().orElse(null); + value = sourceRecord.getValue(); + } + final Schema keySchema = getKafkaConnectSchemaForObject(key, defaultKeySchema); + final Schema valueSchema = getKafkaConnectSchemaForObject(value, defaultValueSchema); + + long offset = taskContext.currentOffset(topicName, partition).incrementAndGet(); Review comment: I am not sure if this is the correct logic. I believe this is the offset from the source partition. Hence, you should get the underlying Pulsar message, retrieve message-id and convert it to a Long number. You shouldn't assign an offset for it, which would result in an indeterministic behavior. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); Review comment: final ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java ########## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.apache.kafka.connect.storage.OffsetBackingStore; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + +@Slf4j +public class PulsarKafkaSinkTaskContext implements SinkTaskContext { + + private final Map<String, String> config; + + private final OffsetBackingStore offsetStore; + private final String topicNamespace; + private final Consumer<Collection<TopicPartition>> onPartitionChange; + private final AtomicBoolean runRepartition = new AtomicBoolean(false); + + private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>(); + + public PulsarKafkaSinkTaskContext(Map<String, String> config, Review comment: A good practice is to pass the pulsar sink context to Kafka sink task context wrapper. It is a clear indicator that we have wrapped the pulsar sink context as a Kafka sink task context even we don't actually use it. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { Review comment: I think the logic here problematic. because the pendingFlush is not accessed in a `synchronized` block. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { Review comment: Based on Kafka sink mode, you should only run the Pulsar source with failover subscription mode. We need to validate that. Because running a Kafka sink in shared subscription doesn't match Kafka's model ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java ########## @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.connector.ConnectorContext; + +@Slf4j +public class PulsarKafkaSinkContext implements ConnectorContext { + + @Override + public void requestTaskReconfiguration() { + // noop; Review comment: throw unsupported exception to make the failure more explicit ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java ########## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.apache.kafka.connect.storage.OffsetBackingStore; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + +@Slf4j +public class PulsarKafkaSinkTaskContext implements SinkTaskContext { + + private final Map<String, String> config; + + private final OffsetBackingStore offsetStore; + private final String topicNamespace; + private final Consumer<Collection<TopicPartition>> onPartitionChange; + private final AtomicBoolean runRepartition = new AtomicBoolean(false); + + private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>(); + + public PulsarKafkaSinkTaskContext(Map<String, String> config, + Consumer<Collection<TopicPartition>> onPartitionChange) { + this.config = config; + + offsetStore = new PulsarOffsetBackingStore(); + PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config); + offsetStore.configure(pulsarKafkaWorkerConfig); + offsetStore.start(); + + this.onPartitionChange = onPartitionChange; + this.topicNamespace = pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG); + } + + public void close() { + offsetStore.stop(); + } + + @Override + public Map<String, String> configs() { + return config; + } + + public AtomicLong currentOffset(String topic, int partition) { + return currentOffset(new TopicPartition(topic, partition)); + } + + public AtomicLong currentOffset(TopicPartition topicPartition) { + AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv -> { + List<ByteBuffer> req = Lists.newLinkedList(); + ByteBuffer key = topicPartitionAsKey(topicPartition); + req.add(key); + CompletableFuture<Long> offsetFuture = new CompletableFuture<>(); + offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> { + if (ex == null) { + if (result != null && result.size() != 0) { + Optional<ByteBuffer> val = result.entrySet().stream() + .filter(entry -> entry.getKey().equals(key)) + .findFirst().map(entry -> entry.getValue()); + if (val.isPresent()) { + long received = val.get().getLong(); + if (log.isDebugEnabled()) { + log.debug("read initial offset for {} == {}", topicPartition, received); + } + offsetFuture.complete(received); + return; + } + } + offsetFuture.complete(-1L); + } else { + offsetFuture.completeExceptionally(ex); + } + }); + + runRepartition.set(true); + try { + return new AtomicLong(offsetFuture.get()); + } catch (Exception e) { + log.error("error getting initial state of " + topicPartition.toString(), e); + return new AtomicLong(-1L); + } + }); + if (runRepartition.compareAndSet(true, false)) { + onPartitionChange.accept(currentOffsets.keySet()); + } + return offset; + } + + public Map<TopicPartition, OffsetAndMetadata> currentOffsets() { + + Map<TopicPartition, OffsetAndMetadata> snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + currentOffsets.forEach((topicPartition, offset) -> { + if (offset.get() > 0) { + snapshot.put(topicPartition, + new OffsetAndMetadata(offset.get(), Optional.empty(), null)); + } + }); + return snapshot; + } + + private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) { + return ByteBuffer.wrap((topicNamespace + "/" + topicPartition.toString()).getBytes(UTF_8)); + + } + + private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, TopicPartition topicPartition, long l) { + ByteBuffer key = topicPartitionAsKey(topicPartition); + ByteBuffer value = ByteBuffer.allocate(Long.BYTES); + value.putLong(l); + value.flip(); + offsetMap.put(key, value); + } + + @SneakyThrows + @Override + public void offset(Map<TopicPartition, Long> map) { + map.forEach((key, value) -> { + if (!currentOffsets.containsKey(key)) { + runRepartition.set(true); + } + currentOffsets.put(key, new AtomicLong(value)); + }); + + if (runRepartition.compareAndSet(true, false)) { + onPartitionChange.accept(currentOffsets.keySet()); + } + } + + @Override + public void offset(TopicPartition topicPartition, long l) { Review comment: same comment above. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null); + defaultValueSchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null); + + Class<?> clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SinkConnector) clazz.getConstructor().newInstance(); + + Class<? extends Task> taskClass = connector.taskClass(); + sinkContext = new PulsarKafkaSinkContext(); + connector.initialize(sinkContext); + connector.start(Maps.fromProperties(props)); + + List<Map<String, String>> configs = connector.taskConfigs(1); + configs.forEach(x -> { + x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); + x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); + }); + task = (SinkTask) taskClass.getConstructor().newInstance(); + taskContext = + new PulsarKafkaSinkTaskContext(configs.get(0), task::open); + task.initialize(taskContext); + task.start(configs.get(0)); + + batchSize = kafkaSinkConfig.getBatchSize(); + lingerMs = kafkaSinkConfig.getLingerTimeMs(); + scheduledExecutor.scheduleAtFixedRate(() -> + this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS); + + isRunning = true; + log.info("Kafka sink started : {}.", props); + } + + private void flushIfNeeded(boolean force) { + if (force || numPendingRecords.get() >= batchSize) { + scheduledExecutor.submit(this::flush); + } + } + + public void flush() { + if (log.isDebugEnabled()) { + log.debug("flush requested, pending: {}, batchSize: {}", + numPendingRecords.get(), batchSize); + } + + if (numPendingRecords.getAndSet(0) == 0) { + return; + } + + Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets(); + CompletableFuture<Void> flushCf; + synchronized (this) { + flushCf = pendingFlush; + pendingFlush = new CompletableFuture<>(); + } + + try { + task.flush(currentOffsets); + taskContext.flushOffsets(currentOffsets); + flushCf.complete(null); + } catch (Throwable t) { + log.error("error flushing pending records", t); + flushCf.completeExceptionally(t); + } + } + + /** + * org.apache.kafka.connect.data.Schema for the object + * @param obj + * @return org.apache.kafka.connect.data.Schema + */ + public static Schema getKafkaConnectSchemaForObject(Object obj, Schema defaultSchema) { + if (obj == null) { + return defaultSchema; + } + + if (primitiveTypeToSchema.containsKey(obj.getClass())) { + return primitiveTypeToSchema.get(obj.getClass()); + } + + // Other types are not supported yet. + // Will fallback to defaults provided. + return defaultSchema; + } + + private SinkRecord toSinkRecord(Record<Object> sourceRecord) { + final int partition = 0; + final Object key; + final Object value; + if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof KeyValue) { + KeyValue<Object, Object> kv = (KeyValue<Object, Object>) sourceRecord.getValue(); + key = kv.getKey(); + value = kv.getValue(); + } else { + key = sourceRecord.getKey().orElse(null); + value = sourceRecord.getValue(); + } + final Schema keySchema = getKafkaConnectSchemaForObject(key, defaultKeySchema); + final Schema valueSchema = getKafkaConnectSchemaForObject(value, defaultValueSchema); + + long offset = taskContext.currentOffset(topicName, partition).incrementAndGet(); + SinkRecord sinkRecord = new SinkRecord(topicName, + partition, + keySchema, + key, + valueSchema, + value, + offset, + sourceRecord.getEventTime().orElse(null), Review comment: if event time is not specified, we should consider using publish time. Or having a configuration to allow people configure which timestamp to use. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class Review comment: I am not sure we need the default key and value schema here. A pulsar record carries the pulsar schema information. We should just write a util function to convert pulsar schema to kafka schema. If there is no schema, then use Kafka bytes schema. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java ########## @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +@Data +@Accessors(chain = true) +public class PulsarKafkaConnectSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @FieldDoc( + defaultValue = "1", + help = "The batch size that Kafka producer will attempt to batch records together.") + private int batchSize = 1; Review comment: why batchSize is 1? ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { Review comment: I also don't think you need to register the callback here. Instead, I think you need to track the pulsar source records in the context in order to make sure the "offsets" are correctly recorded. once you call kafka sink to flush, you need to call sink context to flush. If you successfully flush, you can call sourceRecord to ack, otherwise you fail the source records. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java ########## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.apache.kafka.connect.storage.OffsetBackingStore; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + +@Slf4j +public class PulsarKafkaSinkTaskContext implements SinkTaskContext { + + private final Map<String, String> config; + + private final OffsetBackingStore offsetStore; + private final String topicNamespace; + private final Consumer<Collection<TopicPartition>> onPartitionChange; + private final AtomicBoolean runRepartition = new AtomicBoolean(false); + + private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>(); + + public PulsarKafkaSinkTaskContext(Map<String, String> config, + Consumer<Collection<TopicPartition>> onPartitionChange) { + this.config = config; + + offsetStore = new PulsarOffsetBackingStore(); + PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config); + offsetStore.configure(pulsarKafkaWorkerConfig); + offsetStore.start(); + + this.onPartitionChange = onPartitionChange; + this.topicNamespace = pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG); + } + + public void close() { + offsetStore.stop(); + } + + @Override + public Map<String, String> configs() { + return config; + } + + public AtomicLong currentOffset(String topic, int partition) { + return currentOffset(new TopicPartition(topic, partition)); + } + + public AtomicLong currentOffset(TopicPartition topicPartition) { + AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv -> { + List<ByteBuffer> req = Lists.newLinkedList(); + ByteBuffer key = topicPartitionAsKey(topicPartition); + req.add(key); + CompletableFuture<Long> offsetFuture = new CompletableFuture<>(); + offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> { + if (ex == null) { + if (result != null && result.size() != 0) { + Optional<ByteBuffer> val = result.entrySet().stream() + .filter(entry -> entry.getKey().equals(key)) + .findFirst().map(entry -> entry.getValue()); + if (val.isPresent()) { + long received = val.get().getLong(); + if (log.isDebugEnabled()) { + log.debug("read initial offset for {} == {}", topicPartition, received); + } + offsetFuture.complete(received); + return; + } + } + offsetFuture.complete(-1L); + } else { + offsetFuture.completeExceptionally(ex); + } + }); + + runRepartition.set(true); + try { + return new AtomicLong(offsetFuture.get()); + } catch (Exception e) { + log.error("error getting initial state of " + topicPartition.toString(), e); + return new AtomicLong(-1L); + } + }); + if (runRepartition.compareAndSet(true, false)) { + onPartitionChange.accept(currentOffsets.keySet()); + } + return offset; + } + + public Map<TopicPartition, OffsetAndMetadata> currentOffsets() { + + Map<TopicPartition, OffsetAndMetadata> snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + currentOffsets.forEach((topicPartition, offset) -> { + if (offset.get() > 0) { + snapshot.put(topicPartition, + new OffsetAndMetadata(offset.get(), Optional.empty(), null)); + } + }); + return snapshot; + } + + private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) { + return ByteBuffer.wrap((topicNamespace + "/" + topicPartition.toString()).getBytes(UTF_8)); + + } + + private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, TopicPartition topicPartition, long l) { + ByteBuffer key = topicPartitionAsKey(topicPartition); + ByteBuffer value = ByteBuffer.allocate(Long.BYTES); + value.putLong(l); + value.flip(); + offsetMap.put(key, value); + } + + @SneakyThrows + @Override + public void offset(Map<TopicPartition, Long> map) { Review comment: This should be implemented. If the offset (messageId) is reset, you need to reset the cursor of the input topic. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java ########## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.apache.kafka.connect.storage.OffsetBackingStore; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + +@Slf4j +public class PulsarKafkaSinkTaskContext implements SinkTaskContext { + + private final Map<String, String> config; + + private final OffsetBackingStore offsetStore; + private final String topicNamespace; + private final Consumer<Collection<TopicPartition>> onPartitionChange; + private final AtomicBoolean runRepartition = new AtomicBoolean(false); + + private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>(); Review comment: The offset management here is completely wrong. It would result in a lot of duplicates and inconsistent behavior when sink connectors failed. You shouldn't be responsible for assigning offsets. The offsets are coming from the Pulsar records. Here you should manage the “offsets" (messageIds) of the records a Pulsar sink receives from the pulsar input topic. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java ########## @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; + +@Slf4j +public class KafkaConnectSink implements Sink<Object> { + + private boolean unwrapKeyValueIfAvailable; + + private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema; + static { + primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder() + .put(Boolean.class, Schema.BOOLEAN_SCHEMA) + .put(Byte.class, Schema.INT8_SCHEMA) + .put(Short.class, Schema.INT16_SCHEMA) + .put(Integer.class, Schema.INT32_SCHEMA) + .put(Long.class, Schema.INT64_SCHEMA) + .put(Float.class, Schema.FLOAT32_SCHEMA) + .put(Double.class, Schema.FLOAT64_SCHEMA) + .put(String.class, Schema.STRING_SCHEMA) + .put(byte[].class, Schema.BYTES_SCHEMA) + .build(); + } + + private PulsarKafkaSinkContext sinkContext; + private PulsarKafkaSinkTaskContext taskContext; + private SinkConnector connector; + private SinkTask task; + + private Schema defaultKeySchema; + private Schema defaultValueSchema; + + + private int batchSize; + private long lingerMs; + private final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d") + .build()); + private final AtomicInteger numPendingRecords = new AtomicInteger(0); + + private volatile CompletableFuture<Void> pendingFlush = new CompletableFuture<>(); + private volatile boolean isRunning = false; + + private Properties props = new Properties(); + private PulsarKafkaConnectSinkConfig kafkaSinkConfig; + + protected String topicName; + + @Override + public void write(Record<Object> sourceRecord) { + if (log.isDebugEnabled()) { + log.debug("Record sending to kafka, record={}.", sourceRecord); + } + + if (!isRunning) { + log.error("Sink is stopped. Cannot send the record {}", sourceRecord); + sourceRecord.fail(); + return; + } + + try { + SinkRecord record = toSinkRecord(sourceRecord); + task.put(Lists.newArrayList(record)); + } catch (Exception ex) { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + return; + } + pendingFlush.whenComplete((ignore, ex) -> { + if (ex == null) { + sourceRecord.ack(); + } else { + log.error("Error sending the record {}", sourceRecord, ex); + sourceRecord.fail(); + } + throw new IllegalArgumentException(); + }); + numPendingRecords.incrementAndGet(); + flushIfNeeded(false); + } + + @Override + public void close() throws Exception { + isRunning = false; + flushIfNeeded(true); + scheduledExecutor.shutdown(); + if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) { + log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs); + } + + task.stop(); + connector.stop(); + taskContext.close(); + + log.info("Kafka sink stopped."); + } + + @Override + public void open(Map<String, Object> config, SinkContext ctx) throws Exception { + kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config); + Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); + topicName = kafkaSinkConfig.getTopic(); + unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + + String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); + kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet() + .forEach(kv -> props.put(kv.getKey(), kv.getValue())); + + defaultKeySchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null); + defaultValueSchema = (Schema)Schema.class + .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null); + + Class<?> clazz = Class.forName(kafkaConnectorFQClassName); + connector = (SinkConnector) clazz.getConstructor().newInstance(); + + Class<? extends Task> taskClass = connector.taskClass(); + sinkContext = new PulsarKafkaSinkContext(); + connector.initialize(sinkContext); + connector.start(Maps.fromProperties(props)); + + List<Map<String, String>> configs = connector.taskConfigs(1); + configs.forEach(x -> { + x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic()); + x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl()); + }); + task = (SinkTask) taskClass.getConstructor().newInstance(); + taskContext = + new PulsarKafkaSinkTaskContext(configs.get(0), task::open); + task.initialize(taskContext); + task.start(configs.get(0)); + + batchSize = kafkaSinkConfig.getBatchSize(); + lingerMs = kafkaSinkConfig.getLingerTimeMs(); + scheduledExecutor.scheduleAtFixedRate(() -> + this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS); + + isRunning = true; + log.info("Kafka sink started : {}.", props); + } + + private void flushIfNeeded(boolean force) { + if (force || numPendingRecords.get() >= batchSize) { + scheduledExecutor.submit(this::flush); + } + } + + public void flush() { + if (log.isDebugEnabled()) { + log.debug("flush requested, pending: {}, batchSize: {}", + numPendingRecords.get(), batchSize); + } + + if (numPendingRecords.getAndSet(0) == 0) { + return; + } + + Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets(); + CompletableFuture<Void> flushCf; + synchronized (this) { + flushCf = pendingFlush; + pendingFlush = new CompletableFuture<>(); + } + + try { + task.flush(currentOffsets); + taskContext.flushOffsets(currentOffsets); + flushCf.complete(null); + } catch (Throwable t) { + log.error("error flushing pending records", t); + flushCf.completeExceptionally(t); + } + } + + /** + * org.apache.kafka.connect.data.Schema for the object + * @param obj + * @return org.apache.kafka.connect.data.Schema + */ + public static Schema getKafkaConnectSchemaForObject(Object obj, Schema defaultSchema) { + if (obj == null) { + return defaultSchema; + } + + if (primitiveTypeToSchema.containsKey(obj.getClass())) { + return primitiveTypeToSchema.get(obj.getClass()); + } + + // Other types are not supported yet. + // Will fallback to defaults provided. + return defaultSchema; + } + + private SinkRecord toSinkRecord(Record<Object> sourceRecord) { + final int partition = 0; + final Object key; + final Object value; + if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof KeyValue) { + KeyValue<Object, Object> kv = (KeyValue<Object, Object>) sourceRecord.getValue(); + key = kv.getKey(); + value = kv.getValue(); + } else { + key = sourceRecord.getKey().orElse(null); + value = sourceRecord.getValue(); + } + final Schema keySchema = getKafkaConnectSchemaForObject(key, defaultKeySchema); + final Schema valueSchema = getKafkaConnectSchemaForObject(value, defaultValueSchema); + + long offset = taskContext.currentOffset(topicName, partition).incrementAndGet(); + SinkRecord sinkRecord = new SinkRecord(topicName, + partition, + keySchema, + key, + valueSchema, + value, + offset, + sourceRecord.getEventTime().orElse(null), + TimestampType.NO_TIMESTAMP_TYPE); Review comment: The timestamp type should be determined base on what timestamp to use. ########## File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java ########## @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka.connect; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; +import org.apache.kafka.connect.storage.OffsetBackingStore; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + +@Slf4j +public class PulsarKafkaSinkTaskContext implements SinkTaskContext { + + private final Map<String, String> config; + + private final OffsetBackingStore offsetStore; + private final String topicNamespace; + private final Consumer<Collection<TopicPartition>> onPartitionChange; + private final AtomicBoolean runRepartition = new AtomicBoolean(false); + + private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets = new ConcurrentHashMap<>(); + + public PulsarKafkaSinkTaskContext(Map<String, String> config, + Consumer<Collection<TopicPartition>> onPartitionChange) { + this.config = config; + + offsetStore = new PulsarOffsetBackingStore(); + PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config); + offsetStore.configure(pulsarKafkaWorkerConfig); + offsetStore.start(); + + this.onPartitionChange = onPartitionChange; + this.topicNamespace = pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG); + } + + public void close() { + offsetStore.stop(); + } + + @Override + public Map<String, String> configs() { + return config; + } + + public AtomicLong currentOffset(String topic, int partition) { + return currentOffset(new TopicPartition(topic, partition)); + } + + public AtomicLong currentOffset(TopicPartition topicPartition) { + AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv -> { + List<ByteBuffer> req = Lists.newLinkedList(); + ByteBuffer key = topicPartitionAsKey(topicPartition); + req.add(key); + CompletableFuture<Long> offsetFuture = new CompletableFuture<>(); + offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> result) -> { + if (ex == null) { + if (result != null && result.size() != 0) { + Optional<ByteBuffer> val = result.entrySet().stream() + .filter(entry -> entry.getKey().equals(key)) + .findFirst().map(entry -> entry.getValue()); + if (val.isPresent()) { + long received = val.get().getLong(); + if (log.isDebugEnabled()) { + log.debug("read initial offset for {} == {}", topicPartition, received); + } + offsetFuture.complete(received); + return; + } + } + offsetFuture.complete(-1L); + } else { + offsetFuture.completeExceptionally(ex); + } + }); + + runRepartition.set(true); + try { + return new AtomicLong(offsetFuture.get()); + } catch (Exception e) { + log.error("error getting initial state of " + topicPartition.toString(), e); + return new AtomicLong(-1L); + } + }); + if (runRepartition.compareAndSet(true, false)) { + onPartitionChange.accept(currentOffsets.keySet()); + } + return offset; + } + + public Map<TopicPartition, OffsetAndMetadata> currentOffsets() { + + Map<TopicPartition, OffsetAndMetadata> snapshot = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + currentOffsets.forEach((topicPartition, offset) -> { + if (offset.get() > 0) { + snapshot.put(topicPartition, + new OffsetAndMetadata(offset.get(), Optional.empty(), null)); + } + }); + return snapshot; + } + + private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) { + return ByteBuffer.wrap((topicNamespace + "/" + topicPartition.toString()).getBytes(UTF_8)); + + } + + private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, TopicPartition topicPartition, long l) { + ByteBuffer key = topicPartitionAsKey(topicPartition); + ByteBuffer value = ByteBuffer.allocate(Long.BYTES); + value.putLong(l); + value.flip(); + offsetMap.put(key, value); + } + + @SneakyThrows + @Override + public void offset(Map<TopicPartition, Long> map) { + map.forEach((key, value) -> { + if (!currentOffsets.containsKey(key)) { + runRepartition.set(true); + } + currentOffsets.put(key, new AtomicLong(value)); + }); + + if (runRepartition.compareAndSet(true, false)) { + onPartitionChange.accept(currentOffsets.keySet()); + } + } + + @Override + public void offset(TopicPartition topicPartition, long l) { + Map<TopicPartition, Long> map = Maps.newHashMap(); + map.put(topicPartition, l); + this.offset(map); + } + + @Override + public void timeout(long l) { Review comment: you should implement this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org