sijie commented on a change in pull request #9927:
URL: https://github.com/apache/pulsar/pull/9927#discussion_r595714308



##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null);
+        defaultValueSchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        batchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, 
TimeUnit.MILLISECONDS);
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || numPendingRecords.get() >= batchSize) {
+            scheduledExecutor.submit(this::flush);
+        }
+    }
+
+    public void flush() {
+        if (log.isDebugEnabled()) {
+            log.debug("flush requested, pending: {}, batchSize: {}",
+                    numPendingRecords.get(), batchSize);
+        }
+
+        if (numPendingRecords.getAndSet(0) == 0) {
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
taskContext.currentOffsets();
+        CompletableFuture<Void> flushCf;
+        synchronized (this) {
+            flushCf = pendingFlush;
+            pendingFlush = new CompletableFuture<>();
+        }
+
+        try {
+            task.flush(currentOffsets);
+            taskContext.flushOffsets(currentOffsets);
+            flushCf.complete(null);
+        } catch (Throwable t) {
+            log.error("error flushing pending records", t);
+            flushCf.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * org.apache.kafka.connect.data.Schema for the object
+     * @param obj
+     * @return org.apache.kafka.connect.data.Schema
+     */
+    public static Schema getKafkaConnectSchemaForObject(Object obj, Schema 
defaultSchema) {
+        if (obj == null) {
+            return defaultSchema;
+        }
+
+        if (primitiveTypeToSchema.containsKey(obj.getClass())) {
+            return primitiveTypeToSchema.get(obj.getClass());
+        }
+
+        // Other types are not supported yet.
+        // Will fallback to defaults provided.
+        return defaultSchema;
+    }
+
+    private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
+        final int partition = 0;

Review comment:
       you need to specify the partition. You can get partition id from 
`Record.getPartitionId`.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null);
+        defaultValueSchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        batchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, 
TimeUnit.MILLISECONDS);
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || numPendingRecords.get() >= batchSize) {
+            scheduledExecutor.submit(this::flush);
+        }
+    }
+
+    public void flush() {
+        if (log.isDebugEnabled()) {
+            log.debug("flush requested, pending: {}, batchSize: {}",
+                    numPendingRecords.get(), batchSize);
+        }
+
+        if (numPendingRecords.getAndSet(0) == 0) {
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
taskContext.currentOffsets();
+        CompletableFuture<Void> flushCf;
+        synchronized (this) {
+            flushCf = pendingFlush;
+            pendingFlush = new CompletableFuture<>();
+        }
+
+        try {
+            task.flush(currentOffsets);
+            taskContext.flushOffsets(currentOffsets);
+            flushCf.complete(null);
+        } catch (Throwable t) {
+            log.error("error flushing pending records", t);
+            flushCf.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * org.apache.kafka.connect.data.Schema for the object
+     * @param obj
+     * @return org.apache.kafka.connect.data.Schema
+     */
+    public static Schema getKafkaConnectSchemaForObject(Object obj, Schema 
defaultSchema) {
+        if (obj == null) {
+            return defaultSchema;
+        }
+
+        if (primitiveTypeToSchema.containsKey(obj.getClass())) {
+            return primitiveTypeToSchema.get(obj.getClass());
+        }
+
+        // Other types are not supported yet.
+        // Will fallback to defaults provided.
+        return defaultSchema;
+    }
+
+    private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
+        final int partition = 0;
+        final Object key;
+        final Object value;
+        if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof 
KeyValue) {

Review comment:
       You might also need to another branch to check if `Record` is a 
`KVRecord`.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null);
+        defaultValueSchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        batchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, 
TimeUnit.MILLISECONDS);
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || numPendingRecords.get() >= batchSize) {
+            scheduledExecutor.submit(this::flush);
+        }
+    }
+
+    public void flush() {
+        if (log.isDebugEnabled()) {
+            log.debug("flush requested, pending: {}, batchSize: {}",
+                    numPendingRecords.get(), batchSize);
+        }
+
+        if (numPendingRecords.getAndSet(0) == 0) {
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
taskContext.currentOffsets();
+        CompletableFuture<Void> flushCf;
+        synchronized (this) {
+            flushCf = pendingFlush;
+            pendingFlush = new CompletableFuture<>();
+        }
+
+        try {
+            task.flush(currentOffsets);
+            taskContext.flushOffsets(currentOffsets);
+            flushCf.complete(null);
+        } catch (Throwable t) {
+            log.error("error flushing pending records", t);
+            flushCf.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * org.apache.kafka.connect.data.Schema for the object
+     * @param obj
+     * @return org.apache.kafka.connect.data.Schema
+     */
+    public static Schema getKafkaConnectSchemaForObject(Object obj, Schema 
defaultSchema) {
+        if (obj == null) {
+            return defaultSchema;
+        }
+
+        if (primitiveTypeToSchema.containsKey(obj.getClass())) {
+            return primitiveTypeToSchema.get(obj.getClass());
+        }
+
+        // Other types are not supported yet.
+        // Will fallback to defaults provided.
+        return defaultSchema;
+    }
+
+    private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
+        final int partition = 0;
+        final Object key;
+        final Object value;
+        if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof 
KeyValue) {
+            KeyValue<Object, Object> kv = (KeyValue<Object, Object>) 
sourceRecord.getValue();
+            key = kv.getKey();
+            value = kv.getValue();
+        } else {
+            key = sourceRecord.getKey().orElse(null);
+            value = sourceRecord.getValue();
+        }
+        final Schema keySchema = getKafkaConnectSchemaForObject(key, 
defaultKeySchema);
+        final Schema valueSchema = getKafkaConnectSchemaForObject(value, 
defaultValueSchema);
+
+        long offset = taskContext.currentOffset(topicName, 
partition).incrementAndGet();

Review comment:
       I am not sure if this is the correct logic. I believe this is the offset 
from the source partition. Hence, you should get the underlying Pulsar message, 
retrieve message-id and convert it to a Long number. You shouldn't assign an 
offset for it, which would result in an indeterministic behavior. 

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();

Review comment:
       final

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
##########
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
+@Slf4j
+public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
+
+    private final Map<String, String> config;
+
+    private final OffsetBackingStore offsetStore;
+    private final String topicNamespace;
+    private final Consumer<Collection<TopicPartition>> onPartitionChange;
+    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets 
= new ConcurrentHashMap<>();
+
+    public PulsarKafkaSinkTaskContext(Map<String, String> config,

Review comment:
       A good practice is to pass the pulsar sink context to Kafka sink task 
context wrapper. It is a clear indicator that we have wrapped the pulsar sink 
context as a Kafka sink task context even we don't actually use it.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {

Review comment:
       I think the logic here problematic. because the pendingFlush is not 
accessed in a `synchronized` block.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {

Review comment:
       Based on Kafka sink mode, you should only run the Pulsar source with 
failover subscription mode. We need to validate that. Because running a Kafka 
sink in shared subscription doesn't match Kafka's model

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+@Slf4j
+public class PulsarKafkaSinkContext implements ConnectorContext {
+
+    @Override
+    public void requestTaskReconfiguration() {
+        // noop;

Review comment:
       throw unsupported exception to make the failure more explicit 

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
##########
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
+@Slf4j
+public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
+
+    private final Map<String, String> config;
+
+    private final OffsetBackingStore offsetStore;
+    private final String topicNamespace;
+    private final Consumer<Collection<TopicPartition>> onPartitionChange;
+    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets 
= new ConcurrentHashMap<>();
+
+    public PulsarKafkaSinkTaskContext(Map<String, String> config,
+                                      Consumer<Collection<TopicPartition>> 
onPartitionChange) {
+        this.config = config;
+
+        offsetStore = new PulsarOffsetBackingStore();
+        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
+        offsetStore.configure(pulsarKafkaWorkerConfig);
+        offsetStore.start();
+
+        this.onPartitionChange = onPartitionChange;
+        this.topicNamespace = 
pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG);
+    }
+
+    public void close() {
+        offsetStore.stop();
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return config;
+    }
+
+    public AtomicLong currentOffset(String topic, int partition) {
+        return currentOffset(new TopicPartition(topic, partition));
+    }
+
+    public AtomicLong currentOffset(TopicPartition topicPartition) {
+        AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv 
-> {
+            List<ByteBuffer> req = Lists.newLinkedList();
+            ByteBuffer key = topicPartitionAsKey(topicPartition);
+            req.add(key);
+            CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
+            offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> 
result) -> {
+                if (ex == null) {
+                    if (result != null && result.size() != 0) {
+                        Optional<ByteBuffer> val = result.entrySet().stream()
+                                .filter(entry -> entry.getKey().equals(key))
+                                .findFirst().map(entry -> entry.getValue());
+                        if (val.isPresent()) {
+                            long received = val.get().getLong();
+                            if (log.isDebugEnabled()) {
+                                log.debug("read initial offset for {} == {}", 
topicPartition, received);
+                            }
+                            offsetFuture.complete(received);
+                            return;
+                        }
+                    }
+                    offsetFuture.complete(-1L);
+                } else {
+                    offsetFuture.completeExceptionally(ex);
+                }
+            });
+
+            runRepartition.set(true);
+            try {
+                return new AtomicLong(offsetFuture.get());
+            } catch (Exception e) {
+                log.error("error getting initial state of " + 
topicPartition.toString(), e);
+                return new AtomicLong(-1L);
+            }
+        });
+        if (runRepartition.compareAndSet(true, false)) {
+            onPartitionChange.accept(currentOffsets.keySet());
+        }
+        return offset;
+    }
+
+    public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
+
+        Map<TopicPartition, OffsetAndMetadata> snapshot = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
+        currentOffsets.forEach((topicPartition, offset) -> {
+            if (offset.get() > 0) {
+                snapshot.put(topicPartition,
+                        new OffsetAndMetadata(offset.get(), Optional.empty(), 
null));
+            }
+        });
+        return snapshot;
+    }
+
+    private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
+        return ByteBuffer.wrap((topicNamespace + "/" + 
topicPartition.toString()).getBytes(UTF_8));
+
+    }
+
+    private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, 
TopicPartition topicPartition, long l) {
+        ByteBuffer key = topicPartitionAsKey(topicPartition);
+        ByteBuffer value = ByteBuffer.allocate(Long.BYTES);
+        value.putLong(l);
+        value.flip();
+        offsetMap.put(key, value);
+    }
+
+    @SneakyThrows
+    @Override
+    public void offset(Map<TopicPartition, Long> map) {
+        map.forEach((key, value) -> {
+            if (!currentOffsets.containsKey(key)) {
+                runRepartition.set(true);
+            }
+            currentOffsets.put(key, new AtomicLong(value));
+        });
+
+        if (runRepartition.compareAndSet(true, false)) {
+            onPartitionChange.accept(currentOffsets.keySet());
+        }
+    }
+
+    @Override
+    public void offset(TopicPartition topicPartition, long l) {

Review comment:
       same comment above.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null);
+        defaultValueSchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        batchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, 
TimeUnit.MILLISECONDS);
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || numPendingRecords.get() >= batchSize) {
+            scheduledExecutor.submit(this::flush);
+        }
+    }
+
+    public void flush() {
+        if (log.isDebugEnabled()) {
+            log.debug("flush requested, pending: {}, batchSize: {}",
+                    numPendingRecords.get(), batchSize);
+        }
+
+        if (numPendingRecords.getAndSet(0) == 0) {
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
taskContext.currentOffsets();
+        CompletableFuture<Void> flushCf;
+        synchronized (this) {
+            flushCf = pendingFlush;
+            pendingFlush = new CompletableFuture<>();
+        }
+
+        try {
+            task.flush(currentOffsets);
+            taskContext.flushOffsets(currentOffsets);
+            flushCf.complete(null);
+        } catch (Throwable t) {
+            log.error("error flushing pending records", t);
+            flushCf.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * org.apache.kafka.connect.data.Schema for the object
+     * @param obj
+     * @return org.apache.kafka.connect.data.Schema
+     */
+    public static Schema getKafkaConnectSchemaForObject(Object obj, Schema 
defaultSchema) {
+        if (obj == null) {
+            return defaultSchema;
+        }
+
+        if (primitiveTypeToSchema.containsKey(obj.getClass())) {
+            return primitiveTypeToSchema.get(obj.getClass());
+        }
+
+        // Other types are not supported yet.
+        // Will fallback to defaults provided.
+        return defaultSchema;
+    }
+
+    private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
+        final int partition = 0;
+        final Object key;
+        final Object value;
+        if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof 
KeyValue) {
+            KeyValue<Object, Object> kv = (KeyValue<Object, Object>) 
sourceRecord.getValue();
+            key = kv.getKey();
+            value = kv.getValue();
+        } else {
+            key = sourceRecord.getKey().orElse(null);
+            value = sourceRecord.getValue();
+        }
+        final Schema keySchema = getKafkaConnectSchemaForObject(key, 
defaultKeySchema);
+        final Schema valueSchema = getKafkaConnectSchemaForObject(value, 
defaultValueSchema);
+
+        long offset = taskContext.currentOffset(topicName, 
partition).incrementAndGet();
+        SinkRecord sinkRecord = new SinkRecord(topicName,
+                partition,
+                keySchema,
+                key,
+                valueSchema,
+                value,
+                offset,
+                sourceRecord.getEventTime().orElse(null),

Review comment:
       if event time is not specified, we should consider using publish time.
   
   Or having a configuration to allow people configure which timestamp to use.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class

Review comment:
       I am not sure we need the default key and value schema here. A pulsar 
record carries the pulsar schema information. We should just write a util 
function to convert pulsar schema to kafka schema. If there is no schema, then 
use Kafka bytes schema.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class PulsarKafkaConnectSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+            defaultValue = "1",
+            help = "The batch size that Kafka producer will attempt to batch 
records together.")
+    private int batchSize = 1;

Review comment:
       why batchSize is 1?

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {

Review comment:
       I also don't think you need to register the callback here. Instead, I 
think you need to track the pulsar source records in the context in order to 
make sure the "offsets" are correctly recorded. once you call kafka sink to 
flush, you need to call sink context to flush. If you successfully flush, you 
can call sourceRecord to ack, otherwise you fail the source records.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
##########
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
+@Slf4j
+public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
+
+    private final Map<String, String> config;
+
+    private final OffsetBackingStore offsetStore;
+    private final String topicNamespace;
+    private final Consumer<Collection<TopicPartition>> onPartitionChange;
+    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets 
= new ConcurrentHashMap<>();
+
+    public PulsarKafkaSinkTaskContext(Map<String, String> config,
+                                      Consumer<Collection<TopicPartition>> 
onPartitionChange) {
+        this.config = config;
+
+        offsetStore = new PulsarOffsetBackingStore();
+        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
+        offsetStore.configure(pulsarKafkaWorkerConfig);
+        offsetStore.start();
+
+        this.onPartitionChange = onPartitionChange;
+        this.topicNamespace = 
pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG);
+    }
+
+    public void close() {
+        offsetStore.stop();
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return config;
+    }
+
+    public AtomicLong currentOffset(String topic, int partition) {
+        return currentOffset(new TopicPartition(topic, partition));
+    }
+
+    public AtomicLong currentOffset(TopicPartition topicPartition) {
+        AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv 
-> {
+            List<ByteBuffer> req = Lists.newLinkedList();
+            ByteBuffer key = topicPartitionAsKey(topicPartition);
+            req.add(key);
+            CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
+            offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> 
result) -> {
+                if (ex == null) {
+                    if (result != null && result.size() != 0) {
+                        Optional<ByteBuffer> val = result.entrySet().stream()
+                                .filter(entry -> entry.getKey().equals(key))
+                                .findFirst().map(entry -> entry.getValue());
+                        if (val.isPresent()) {
+                            long received = val.get().getLong();
+                            if (log.isDebugEnabled()) {
+                                log.debug("read initial offset for {} == {}", 
topicPartition, received);
+                            }
+                            offsetFuture.complete(received);
+                            return;
+                        }
+                    }
+                    offsetFuture.complete(-1L);
+                } else {
+                    offsetFuture.completeExceptionally(ex);
+                }
+            });
+
+            runRepartition.set(true);
+            try {
+                return new AtomicLong(offsetFuture.get());
+            } catch (Exception e) {
+                log.error("error getting initial state of " + 
topicPartition.toString(), e);
+                return new AtomicLong(-1L);
+            }
+        });
+        if (runRepartition.compareAndSet(true, false)) {
+            onPartitionChange.accept(currentOffsets.keySet());
+        }
+        return offset;
+    }
+
+    public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
+
+        Map<TopicPartition, OffsetAndMetadata> snapshot = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
+        currentOffsets.forEach((topicPartition, offset) -> {
+            if (offset.get() > 0) {
+                snapshot.put(topicPartition,
+                        new OffsetAndMetadata(offset.get(), Optional.empty(), 
null));
+            }
+        });
+        return snapshot;
+    }
+
+    private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
+        return ByteBuffer.wrap((topicNamespace + "/" + 
topicPartition.toString()).getBytes(UTF_8));
+
+    }
+
+    private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, 
TopicPartition topicPartition, long l) {
+        ByteBuffer key = topicPartitionAsKey(topicPartition);
+        ByteBuffer value = ByteBuffer.allocate(Long.BYTES);
+        value.putLong(l);
+        value.flip();
+        offsetMap.put(key, value);
+    }
+
+    @SneakyThrows
+    @Override
+    public void offset(Map<TopicPartition, Long> map) {

Review comment:
       This should be implemented. If the offset (messageId) is reset, you need 
to reset the cursor of the input topic.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
##########
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
+@Slf4j
+public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
+
+    private final Map<String, String> config;
+
+    private final OffsetBackingStore offsetStore;
+    private final String topicNamespace;
+    private final Consumer<Collection<TopicPartition>> onPartitionChange;
+    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets 
= new ConcurrentHashMap<>();

Review comment:
       The offset management here is completely wrong. It would result in a lot 
of duplicates and inconsistent behavior when sink connectors failed. You 
shouldn't be responsible for assigning offsets. The offsets are coming from the 
Pulsar records. Here you should manage the “offsets" (messageIds) of the 
records a Pulsar sink receives from the pulsar input topic. 

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<Object> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    private PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private Schema defaultKeySchema;
+    private Schema defaultValueSchema;
+
+
+    private int batchSize;
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final AtomicInteger numPendingRecords = new AtomicInteger(0);
+
+    private volatile CompletableFuture<Void> pendingFlush = new 
CompletableFuture<>();
+    private volatile boolean isRunning = false;
+
+    private Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<Object> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.error("Sink is stopped. Cannot send the record {}", 
sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlush.whenComplete((ignore, ex) -> {
+            if (ex == null) {
+                sourceRecord.ack();
+            } else {
+                log.error("Error sending the record {}", sourceRecord, ex);
+                sourceRecord.fail();
+            }
+            throw new IllegalArgumentException();
+        });
+        numPendingRecords.incrementAndGet();
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, 
TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * 
lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws 
Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = 
kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet()
+                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
+
+        defaultKeySchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultKeySchema()).get(null);
+        defaultValueSchema = (Schema)Schema.class
+                .getField(kafkaSinkConfig.getDefaultValueSchema()).get(null);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        batchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, 
TimeUnit.MILLISECONDS);
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || numPendingRecords.get() >= batchSize) {
+            scheduledExecutor.submit(this::flush);
+        }
+    }
+
+    public void flush() {
+        if (log.isDebugEnabled()) {
+            log.debug("flush requested, pending: {}, batchSize: {}",
+                    numPendingRecords.get(), batchSize);
+        }
+
+        if (numPendingRecords.getAndSet(0) == 0) {
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
taskContext.currentOffsets();
+        CompletableFuture<Void> flushCf;
+        synchronized (this) {
+            flushCf = pendingFlush;
+            pendingFlush = new CompletableFuture<>();
+        }
+
+        try {
+            task.flush(currentOffsets);
+            taskContext.flushOffsets(currentOffsets);
+            flushCf.complete(null);
+        } catch (Throwable t) {
+            log.error("error flushing pending records", t);
+            flushCf.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * org.apache.kafka.connect.data.Schema for the object
+     * @param obj
+     * @return org.apache.kafka.connect.data.Schema
+     */
+    public static Schema getKafkaConnectSchemaForObject(Object obj, Schema 
defaultSchema) {
+        if (obj == null) {
+            return defaultSchema;
+        }
+
+        if (primitiveTypeToSchema.containsKey(obj.getClass())) {
+            return primitiveTypeToSchema.get(obj.getClass());
+        }
+
+        // Other types are not supported yet.
+        // Will fallback to defaults provided.
+        return defaultSchema;
+    }
+
+    private SinkRecord toSinkRecord(Record<Object> sourceRecord) {
+        final int partition = 0;
+        final Object key;
+        final Object value;
+        if (unwrapKeyValueIfAvailable && sourceRecord.getValue() instanceof 
KeyValue) {
+            KeyValue<Object, Object> kv = (KeyValue<Object, Object>) 
sourceRecord.getValue();
+            key = kv.getKey();
+            value = kv.getValue();
+        } else {
+            key = sourceRecord.getKey().orElse(null);
+            value = sourceRecord.getValue();
+        }
+        final Schema keySchema = getKafkaConnectSchemaForObject(key, 
defaultKeySchema);
+        final Schema valueSchema = getKafkaConnectSchemaForObject(value, 
defaultValueSchema);
+
+        long offset = taskContext.currentOffset(topicName, 
partition).incrementAndGet();
+        SinkRecord sinkRecord = new SinkRecord(topicName,
+                partition,
+                keySchema,
+                key,
+                valueSchema,
+                value,
+                offset,
+                sourceRecord.getEventTime().orElse(null),
+                TimestampType.NO_TIMESTAMP_TYPE);

Review comment:
       The timestamp type should be determined base on what timestamp to use.

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
##########
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
+@Slf4j
+public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
+
+    private final Map<String, String> config;
+
+    private final OffsetBackingStore offsetStore;
+    private final String topicNamespace;
+    private final Consumer<Collection<TopicPartition>> onPartitionChange;
+    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
+
+    private final ConcurrentHashMap<TopicPartition, AtomicLong> currentOffsets 
= new ConcurrentHashMap<>();
+
+    public PulsarKafkaSinkTaskContext(Map<String, String> config,
+                                      Consumer<Collection<TopicPartition>> 
onPartitionChange) {
+        this.config = config;
+
+        offsetStore = new PulsarOffsetBackingStore();
+        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
+        offsetStore.configure(pulsarKafkaWorkerConfig);
+        offsetStore.start();
+
+        this.onPartitionChange = onPartitionChange;
+        this.topicNamespace = 
pulsarKafkaWorkerConfig.getString(TOPIC_NAMESPACE_CONFIG);
+    }
+
+    public void close() {
+        offsetStore.stop();
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return config;
+    }
+
+    public AtomicLong currentOffset(String topic, int partition) {
+        return currentOffset(new TopicPartition(topic, partition));
+    }
+
+    public AtomicLong currentOffset(TopicPartition topicPartition) {
+        AtomicLong offset = currentOffsets.computeIfAbsent(topicPartition, kv 
-> {
+            List<ByteBuffer> req = Lists.newLinkedList();
+            ByteBuffer key = topicPartitionAsKey(topicPartition);
+            req.add(key);
+            CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
+            offsetStore.get(req, (Throwable ex, Map<ByteBuffer, ByteBuffer> 
result) -> {
+                if (ex == null) {
+                    if (result != null && result.size() != 0) {
+                        Optional<ByteBuffer> val = result.entrySet().stream()
+                                .filter(entry -> entry.getKey().equals(key))
+                                .findFirst().map(entry -> entry.getValue());
+                        if (val.isPresent()) {
+                            long received = val.get().getLong();
+                            if (log.isDebugEnabled()) {
+                                log.debug("read initial offset for {} == {}", 
topicPartition, received);
+                            }
+                            offsetFuture.complete(received);
+                            return;
+                        }
+                    }
+                    offsetFuture.complete(-1L);
+                } else {
+                    offsetFuture.completeExceptionally(ex);
+                }
+            });
+
+            runRepartition.set(true);
+            try {
+                return new AtomicLong(offsetFuture.get());
+            } catch (Exception e) {
+                log.error("error getting initial state of " + 
topicPartition.toString(), e);
+                return new AtomicLong(-1L);
+            }
+        });
+        if (runRepartition.compareAndSet(true, false)) {
+            onPartitionChange.accept(currentOffsets.keySet());
+        }
+        return offset;
+    }
+
+    public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
+
+        Map<TopicPartition, OffsetAndMetadata> snapshot = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
+        currentOffsets.forEach((topicPartition, offset) -> {
+            if (offset.get() > 0) {
+                snapshot.put(topicPartition,
+                        new OffsetAndMetadata(offset.get(), Optional.empty(), 
null));
+            }
+        });
+        return snapshot;
+    }
+
+    private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
+        return ByteBuffer.wrap((topicNamespace + "/" + 
topicPartition.toString()).getBytes(UTF_8));
+
+    }
+
+    private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, 
TopicPartition topicPartition, long l) {
+        ByteBuffer key = topicPartitionAsKey(topicPartition);
+        ByteBuffer value = ByteBuffer.allocate(Long.BYTES);
+        value.putLong(l);
+        value.flip();
+        offsetMap.put(key, value);
+    }
+
+    @SneakyThrows
+    @Override
+    public void offset(Map<TopicPartition, Long> map) {
+        map.forEach((key, value) -> {
+            if (!currentOffsets.containsKey(key)) {
+                runRepartition.set(true);
+            }
+            currentOffsets.put(key, new AtomicLong(value));
+        });
+
+        if (runRepartition.compareAndSet(true, false)) {
+            onPartitionChange.accept(currentOffsets.keySet());
+        }
+    }
+
+    @Override
+    public void offset(TopicPartition topicPartition, long l) {
+        Map<TopicPartition, Long> map = Maps.newHashMap();
+        map.put(topicPartition, l);
+        this.offset(map);
+    }
+
+    @Override
+    public void timeout(long l) {

Review comment:
       you should implement this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to