This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab1e28  C API wrapper based on C++ client library (#1736)
1ab1e28 is described below

commit 1ab1e28c2c419b50ca7109073904510f8aa0b641
Author: Matteo Merli <[email protected]>
AuthorDate: Sun May 6 15:42:19 2018 -0700

    C API wrapper based on C++ client library (#1736)
    
    ### Motivation
    
    Added plain C API for Pulsar client.
    
    This is implemented as a super-thin wrapper on top of existing C++ client 
library. The same `libpulsar.so` will contain both implementations.
    
    Primary use case for this API is to enable wrapping Pulsar client library 
in higher level languages that have facilities to convert C APIs but not C++ 
code. One such example is Go language, that with `cgo` can generate bindings 
for C headers.
    
    Added some working examples for how to use Pulsar C API and I'll add unit 
tests directly in Go, to cover both wrappers at once.
---
 pulsar-client-cpp/.gitignore                       |   4 +
 pulsar-client-cpp/examples/CMakeLists.txt          |  24 +++
 pulsar-client-cpp/examples/SampleConsumerCApi.c    |  60 +++++++
 .../examples/SampleConsumerListenerCApi.c          |  59 +++++++
 pulsar-client-cpp/examples/SampleProducerCApi.c    |  63 +++++++
 pulsar-client-cpp/examples/SampleReaderCApi.c      |  59 +++++++
 .../include/pulsar/c/authentication.h              |  35 ++++
 pulsar-client-cpp/include/pulsar/c/client.h        | 133 ++++++++++++++
 .../include/pulsar/c/client_configuration.h        | 144 +++++++++++++++
 pulsar-client-cpp/include/pulsar/c/consumer.h      | 194 +++++++++++++++++++++
 .../include/pulsar/c/consumer_configuration.h      | 163 +++++++++++++++++
 pulsar-client-cpp/include/pulsar/c/message.h       | 168 ++++++++++++++++++
 pulsar-client-cpp/include/pulsar/c/message_id.h    |  53 ++++++
 .../include/pulsar/c/message_router.h              |  36 ++++
 pulsar-client-cpp/include/pulsar/c/producer.h      | 119 +++++++++++++
 .../include/pulsar/c/producer_configuration.h      | 147 ++++++++++++++++
 pulsar-client-cpp/include/pulsar/c/reader.h        |  69 ++++++++
 .../include/pulsar/c/reader_configuration.h        |  82 +++++++++
 pulsar-client-cpp/include/pulsar/c/result.h        |  81 +++++++++
 pulsar-client-cpp/lib/CMakeLists.txt               |   2 +-
 pulsar-client-cpp/lib/c/c_Authentication.cc        |  33 ++++
 pulsar-client-cpp/lib/c/c_Client.cc                | 142 +++++++++++++++
 pulsar-client-cpp/lib/c/c_ClientConfiguration.cc   | 115 ++++++++++++
 pulsar-client-cpp/lib/c/c_Consumer.cc              | 122 +++++++++++++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 104 +++++++++++
 pulsar-client-cpp/lib/c/c_Message.cc               |  96 ++++++++++
 pulsar-client-cpp/lib/c/c_MessageId.cc             |  58 ++++++
 pulsar-client-cpp/lib/c/c_MessageRouter.cc         |  26 +++
 pulsar-client-cpp/lib/c/c_Producer.cc              |  62 +++++++
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 175 +++++++++++++++++++
 pulsar-client-cpp/lib/c/c_Reader.cc                |  54 ++++++
 pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc   |  78 +++++++++
 pulsar-client-cpp/lib/c/c_Result.cc                |  23 +++
 pulsar-client-cpp/lib/c/c_structs.h                |  80 +++++++++
 34 files changed, 2862 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index c660cd1..e14b081 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -35,9 +35,13 @@ lib*.so*
 
 # Exclude compiled executables
 /examples/SampleProducer
+/examples/SampleProducerCApi
 /examples/SampleConsumer
+/examples/SampleConsumerCApi
 /examples/SampleAsyncProducer
 /examples/SampleConsumerListener
+/examples/SampleConsumerListenerCApi
+/examples/SampleReaderCApi
 /tests/main
 /perf/perfProducer
 /perf/perfConsumer
diff --git a/pulsar-client-cpp/examples/CMakeLists.txt 
b/pulsar-client-cpp/examples/CMakeLists.txt
index fe5eb6e..6459b60 100644
--- a/pulsar-client-cpp/examples/CMakeLists.txt
+++ b/pulsar-client-cpp/examples/CMakeLists.txt
@@ -33,12 +33,36 @@ set(SAMPLE_PRODUCER_SOURCES
   SampleProducer.cc
 )
 
+set(SAMPLE_PRODUCER_C_SOURCES
+    SampleProducerCApi.c
+)
+
+set(SAMPLE_CONSUMER_C_SOURCES
+    SampleConsumerCApi.c
+)
+
+set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
+    SampleConsumerListenerCApi.c
+)
+
+set(SAMPLE_READER_C_SOURCES
+        SampleReaderCApi.c
+)
+
 add_executable(SampleAsyncProducer    ${SAMPLE_ASYNC_PRODUCER_SOURCES})
 add_executable(SampleConsumer         ${SAMPLE_CONSUMER_SOURCES})
 add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
 add_executable(SampleProducer         ${SAMPLE_PRODUCER_SOURCES})
+add_executable(SampleProducerCApi         ${SAMPLE_PRODUCER_C_SOURCES})
+add_executable(SampleConsumerCApi         ${SAMPLE_CONSUMER_C_SOURCES})
+add_executable(SampleConsumerListenerCApi         
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleReaderCApi         ${SAMPLE_READER_C_SOURCES})
 
 target_link_libraries(SampleAsyncProducer    ${CLIENT_LIBS})
 target_link_libraries(SampleConsumer         ${CLIENT_LIBS})
 target_link_libraries(SampleConsumerListener ${CLIENT_LIBS})
 target_link_libraries(SampleProducer         ${CLIENT_LIBS})
+target_link_libraries(SampleProducerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerListenerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleReaderCApi     ${CLIENT_LIBS})
\ No newline at end of file
diff --git a/pulsar-client-cpp/examples/SampleConsumerCApi.c 
b/pulsar-client-cpp/examples/SampleConsumerCApi.c
new file mode 100644
index 0000000..97b074d
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerCApi.c
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", 
conf);
+
+    pulsar_consumer_configuration_t *consumer_conf = 
pulsar_consumer_configuration_create();
+    pulsar_consumer_configuration_set_consumer_type(consumer_conf, 
pulsar_ConsumerShared);
+
+    pulsar_consumer_t *consumer;
+    pulsar_result res = pulsar_client_subscribe(client, "my-topic", 
"my-subscrition", consumer_conf, &consumer);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create subscribe to topic: %s\n", 
pulsar_result_str(res));
+        return 1;
+    }
+
+    for (;;) {
+        pulsar_message_t *message;
+        res = pulsar_consumer_receive(consumer, &message);
+        if (res != pulsar_result_Ok) {
+            printf("Failed to receive message: %s\n", pulsar_result_str(res));
+            return 1;
+        }
+
+        printf("Received message with payload: '%.*s'\n", 
pulsar_message_get_length(message),
+               pulsar_message_get_data(message));
+
+        pulsar_consumer_acknowledge(consumer, message);
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_consumer_close(consumer);
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c 
b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
new file mode 100644
index 0000000..8f3ed0d
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* 
message) {
+    printf("Received message with payload: '%.*s'\n", 
pulsar_message_get_length(message),
+           pulsar_message_get_data(message));
+
+    pulsar_consumer_acknowledge(consumer, message);
+    pulsar_message_free(message);
+}
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", 
conf);
+
+    pulsar_consumer_configuration_t *consumer_conf = 
pulsar_consumer_configuration_create();
+    pulsar_consumer_configuration_set_consumer_type(consumer_conf, 
pulsar_ConsumerShared);
+    pulsar_consumer_configuration_set_message_listener(consumer_conf, 
listener_callback);
+
+    pulsar_consumer_t *consumer;
+    pulsar_result res = pulsar_client_subscribe(client, "my-topic", 
"my-subscrition", consumer_conf, &consumer);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create subscribe to topic: %s\n", 
pulsar_result_str(res));
+        return 1;
+    }
+
+    // Block main thread
+    fgetc(stdin);
+
+    printf("\nClosing consumer\n");
+
+    // Cleanup
+    pulsar_consumer_close(consumer);
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleProducerCApi.c 
b/pulsar-client-cpp/examples/SampleProducerCApi.c
new file mode 100644
index 0000000..74853e2
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleProducerCApi.c
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <stdio.h>
+#include <string.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", 
conf);
+
+    pulsar_producer_configuration_t* producer_conf = 
pulsar_producer_configuration_create();
+    pulsar_producer_configuration_set_batching_enabled(producer_conf, 1);
+    pulsar_producer_t *producer;
+
+    pulsar_result err = pulsar_client_create_producer(client, "my-topic", 
producer_conf, &producer);
+    if (err != pulsar_result_Ok) {
+        printf("Failed to create producer: %s\n", pulsar_result_str(err));
+        return 1;
+    }
+
+    for (int i = 0; i < 10; i++) {
+        const char* data = "my-content";
+        pulsar_message_t* message = pulsar_message_create();
+        pulsar_message_set_content(message, data, strlen(data));
+
+        err = pulsar_producer_send(producer, message);
+        if (err == pulsar_result_Ok) {
+            printf("Sent message %d\n", i);
+        } else {
+            printf("Failed to publish message: %s\n", pulsar_result_str(err));
+            return 1;
+        }
+
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_producer_close(producer);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleReaderCApi.c 
b/pulsar-client-cpp/examples/SampleReaderCApi.c
new file mode 100644
index 0000000..c0eec06
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleReaderCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", 
conf);
+
+    pulsar_reader_configuration_t *reader_conf = 
pulsar_reader_configuration_create();
+
+    pulsar_reader_t *reader;
+    pulsar_result res = pulsar_client_create_reader(client, "my-topic", 
pulsar_message_id_earliest(), reader_conf,
+                                                    &reader);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create reader: %s\n", pulsar_result_str(res));
+        return 1;
+    }
+
+    for (;;) {
+        pulsar_message_t *message;
+        res = pulsar_reader_read_next(reader, &message);
+        if (res != pulsar_result_Ok) {
+            printf("Failed to read message: %s\n", pulsar_result_str(res));
+            return 1;
+        }
+
+        printf("Received message with payload: '%.*s'\n", 
pulsar_message_get_length(message),
+               pulsar_message_get_data(message));
+
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_reader_close(reader);
+    pulsar_reader_free(reader);
+    pulsar_reader_configuration_free(reader_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h 
b/pulsar-client-cpp/include/pulsar/c/authentication.h
new file mode 100644
index 0000000..6f405c2
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
+                                                      const char 
*authParamsString);
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h 
b/pulsar-client-cpp/include/pulsar/c/client.h
new file mode 100644
index 0000000..11320e7
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/client_configuration.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/message_id.h>
+#include <pulsar/c/producer.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/reader.h>
+#include <pulsar/c/consumer_configuration.h>
+#include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/c/result.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client pulsar_client_t;
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+typedef void (*pulsar_create_producer_callback)(pulsar_result result, 
pulsar_producer_t *producer);
+
+typedef void (*pulsar_subscribe_callback)(pulsar_result result, 
pulsar_consumer_t *consumer);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t 
*reader);
+
+typedef void (*pulsar_close_callback)(pulsar_result result);
+
+/**
+ * Create a Pulsar client object connecting to the specified cluster address 
and using the specified
+ * configuration.
+ *
+ * @param serviceUrl the Pulsar endpoint to use (eg: 
pulsar://broker-example.com:6650)
+ * @param clientConfiguration the client configuration to use
+ */
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+                                      const pulsar_client_configuration_t 
*clientConfiguration);
+
+/**
+ * Create a producer with default configuration
+ *
+ * @see createProducer(const std::string&, const ProducerConfiguration&, 
Producer&)
+ *
+ * @param topic the topic where the new producer will publish
+ * @param producer a non-const reference where the new producer will be copied
+ * @return ResultOk if the producer has been successfully created
+ * @return ResultError if there was an error
+ */
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const 
char *topic,
+                                            const 
pulsar_producer_configuration_t *conf,
+                                            pulsar_producer_t **producer);
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char 
*topic,
+                                         const pulsar_producer_configuration_t 
*conf,
+                                         pulsar_create_producer_callback 
callback);
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char 
*topic,
+                                      const char *subscriptionName,
+                                      const pulsar_consumer_configuration_t 
*conf,
+                                      pulsar_consumer_t **consumer);
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, 
const char *subscriptionName,
+                                   const pulsar_consumer_configuration_t 
*conf, pulsar_consumer_t **consumer,
+                                   pulsar_subscribe_callback callback);
+
+/**
+ * Create a topic reader with given {@code ReaderConfiguration} for reading 
messages from the specified
+ * topic.
+ * <p>
+ * The Reader provides a low-level abstraction that allows for manual 
positioning in the topic, without
+ * using a
+ * subscription. Reader can only work on non-partitioned topics.
+ * <p>
+ * The initial reader positioning is done by specifying a message id. The 
options are:
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Start reading from the earliest 
message available in the topic
+ * <li><code>MessageId.latest</code> : Start reading from the end topic, only 
getting messages published
+ * after the
+ * reader was created
+ * <li><code>MessageId</code> : When passing a particular message id, the 
reader will position itself on
+ * that
+ * specific position. The first message to be read will be the message next to 
the specified messageId.
+ * </ul>
+ *
+ * @param topic
+ *            The name of the topic where to read
+ * @param startMessageId
+ *            The message id where the reader will position itself. The first 
message returned will be the
+ * one after
+ *            the specified startMessageId
+ * @param conf
+ *            The {@code ReaderConfiguration} object
+ * @return The {@code Reader} object
+ */
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char 
*topic,
+                                          const pulsar_message_id_t 
*startMessageId,
+                                          pulsar_reader_configuration_t *conf, 
pulsar_reader_t **reader);
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char 
*topic,
+                                       const pulsar_message_id_t 
*startMessageId,
+                                       pulsar_reader_configuration_t *conf, 
pulsar_reader_t **reader,
+                                       pulsar_reader_callback callback);
+
+pulsar_result pulsar_client_close(pulsar_client_t *client);
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback 
callback);
+
+void pulsar_client_free(pulsar_client_t *client);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
new file mode 100644
index 0000000..47f78db
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_client_configuration_t *pulsar_client_configuration_create();
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf);
+
+/**
+ * Set the authentication method to be used with the broker
+ *
+ * @param authentication the authentication data to use
+ */
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+                                          pulsar_authentication_t 
*authentication);
+
+/**
+ * Set timeout on client operations (subscribe, create producer, close, 
unsubscribe)
+ * Default is 30 seconds.
+ *
+ * @param timeout the timeout after which the operation will be considered as 
failed
+ */
+void 
pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t
 *conf,
+                                                               int timeout);
+
+/**
+ * @return the client operations timeout in seconds
+ */
+int 
pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t
 *conf);
+
+/**
+ * Set the number of IO threads to be used by the Pulsar client. Default is 1
+ * thread.
+ *
+ * @param threads number of threads
+ */
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t 
*conf, int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t 
*conf);
+
+/**
+ * Set the number of threads to be used by the Pulsar client when delivering 
messages
+ * through message listener. Default is 1 thread per Pulsar client.
+ *
+ * If using more than 1 thread, messages for distinct MessageListener will be
+ * delivered in different threads, however a single MessageListener will always
+ * be assigned to the same thread.
+ *
+ * @param threads number of threads
+ */
+void 
pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t
 *conf,
+                                                              int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int 
pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t
 *conf);
+
+/**
+ * Number of concurrent lookup-requests allowed on each broker-connection to 
prevent overload on broker.
+ * <i>(default: 50000)</i> It should be configured with higher value only in 
case of it requires to
+ * produce/subscribe on
+ * thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+void 
pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t
 *conf,
+                                                               int 
concurrentLookupRequest);
+
+/**
+ * @return Get configured total allowed concurrent lookup-request.
+ */
+int 
pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t
 *conf);
+
+/**
+ * Initialize the log configuration
+ *
+ * @param logConfFilePath  path of the configuration file
+ */
+void 
pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t
 *conf,
+                                                        const char 
*logConfFilePath);
+
+/**
+ * Get the path of log configuration file (log4cpp)
+ */
+const char 
*pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t
 *conf);
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t 
*conf, int useTls);
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t 
*conf);
+
+void 
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t
 *conf,
+                                                               const char 
*tlsTrustCertsFilePath);
+
+const char 
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
 *conf);
+
+void 
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
 *conf,
+                                                                   int 
allowInsecure);
+
+int 
pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t
 *conf);
+
+/*
+ * Initialize stats interval in seconds. Stats are printed and reset after 
every 'statsIntervalInSeconds'.
+ * Set to 0 in order to disable stats collection.
+ */
+void 
pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t
 *conf,
+                                                               const unsigned 
int interval);
+
+/*
+ * Get the stats interval set in the client.
+ */
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+    pulsar_client_configuration_t *conf);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h 
b/pulsar-client-cpp/include/pulsar/c/consumer.h
new file mode 100644
index 0000000..59c99e3
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_consumer pulsar_consumer_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this consumer is subscribed to
+ */
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer);
+
+/**
+ * @return the consumer name
+ */
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer);
+
+/**
+ * Unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer 
is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @see asyncUnsubscribe
+ * @return Result::ResultOk if the unsubscribe operation completed successfully
+ * @return Result::ResultError if the unsubscribe operation failed
+ */
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
+
+/**
+ * Asynchronously unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer 
is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @param callback the callback to get notified when the operation is complete
+ */
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, 
pulsar_result_callback callback);
+
+/**
+ * Receive a single message.
+ *
+ * If a message is not immediately available, this method will block until a 
new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in 
the configuration
+ */
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, 
pulsar_message_t **msg);
+
+/**
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in 
the configuration
+ */
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t 
*consumer, pulsar_message_t **msg,
+                                                   int timeoutMs);
+
+/**
+ * Acknowledge the reception of a single message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the message will not be re-delivered to this consumer.
+ *
+ * @see asyncAcknowledge
+ * @param message the message to acknowledge
+ * @return ResultOk if the message was successfully acknowledged
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer, 
pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId);
+
+/**
+ * Asynchronously acknowledge the reception of a single message.
+ *
+ * This method will initiate the operation and return immediately. The 
provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been 
acknowledged
+ */
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, 
pulsar_message_t *message,
+                                       pulsar_result_callback callback);
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId,
+                                          pulsar_result_callback callback);
+
+/**
+ * Acknowledge the reception of all the messages in the stream up to (and 
including)
+ * the provided message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the messages will not be re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to 
ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(const Message&, 
ResultCallback) and
+ * waiting for the callback to be triggered.
+ *
+ * @param message the last message in the stream to acknowledge
+ * @return ResultOk if the message was successfully acknowledged. All 
previously delivered messages for
+ * this topic are also acknowledged.
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t 
*consumer, pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t 
*consumer,
+                                                        pulsar_message_id_t 
*messageId);
+
+/**
+ * Asynchronously acknowledge the reception of all the messages in the stream 
up to (and
+ * including) the provided message.
+ *
+ * This method will initiate the operation and return immediately. The 
provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been 
acknowledged
+ */
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, 
pulsar_message_t *message,
+                                                  pulsar_result_callback 
callback);
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t 
*consumer,
+                                                     pulsar_message_id_t 
*messageId,
+                                                     pulsar_result_callback 
callback);
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, 
pulsar_result_callback callback);
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer);
+
+/*
+ * Pause receiving messages via the messageListener, till 
resumeMessageListener() is called.
+ */
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t 
*consumer);
+
+/*
+ * Resume receiving the messages via the messageListener.
+ * Asynchronously receive all the messages enqueued from time 
pauseMessageListener() was called.
+ */
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
+
+/**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request 
is ignored if the consumer is
+ * not
+ * active for the given topic. In Shared mode, the consumers messages to be 
redelivered are distributed
+ * across all
+ * the connected consumers. This is a non blocking call and doesn't throw an 
exception. In case the
+ * connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
new file mode 100644
index 0000000..3bd9571
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t;
+
+typedef enum {
+    /**
+     * There can be only 1 consumer on the same topic with the same 
consumerName
+     */
+    pulsar_ConsumerExclusive,
+
+    /**
+     * Multiple consumers will be able to use the same consumerName and the 
messages
+     *  will be dispatched according to a round-robin rotation between the 
connected consumers
+     */
+    pulsar_ConsumerShared,
+
+    /** Only one consumer is active on the subscription; Subscription can have 
N consumers
+     *  connected one of which will get promoted to master if the current 
master becomes inactive
+     */
+    pulsar_ConsumerFailover
+} pulsar_consumer_type;
+
+/// Callback definition for MessageListener
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, 
pulsar_message_t *msg);
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t 
*consumer_configuration);
+
+/**
+ * Specify the consumer type. The consumer type enables
+ * specifying the type of subscription. In Exclusive subscription,
+ * only a single consumer is allowed to attach to the subscription. Other 
consumers
+ * will get an error message. In Shared subscription, multiple consumers will 
be
+ * able to use the same subscription name and the messages will be dispatched 
in a
+ * round robin fashion. In Failover subscription, a primary-failover 
subscription model
+ * allows for multiple consumers to attach to a single subscription, though 
only one
+ * of them will be “master” at a given time. Only the primary consumer will 
receive
+ * messages. When the primary consumer gets disconnected, one among the 
failover
+ * consumers will be promoted to primary and will start getting messages.
+ */
+void 
pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                     pulsar_consumer_type 
consumerType);
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * and acknowledge messages delivered. A listener will be called in order
+ * for every message received.
+ */
+void pulsar_consumer_configuration_set_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration, 
pulsar_message_listener messageListener);
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                         pulsar_consumer_t *consumer);
+
+/**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by 
the Consumer before the
+ * application calls receive(). Using a higher value could potentially 
increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the 
consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared 
subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout 
nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should 
not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ *            the new receiver queue size value
+ */
+void pulsar_consumer_configuration_set_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration, int size);
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual 
partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value 
(default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration, int 
maxTotalReceiverQueueSizeAcrossPartitions);
+
+/**
+ * @return the configured max total receiver queue size across partitions
+ */
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                       const char *consumerName);
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t 
*consumer_configuration);
+
+/**
+ * Set the timeout in milliseconds for unacknowledged messages, the timeout 
needs to be greater than
+ * 10 seconds. An Exception is thrown if the given value is less than 10000 
(10 seconds).
+ * If a successful acknowledgement is not sent within the timeout all the 
unacknowledged messages are
+ * redelivered.
+ * @param timeout in milliseconds
+ */
+void 
pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                     const uint64_t 
milliSeconds);
+
+/**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+long 
pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t 
*consumer_configuration);
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t 
*consumer_configuration);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoKeyReader(CryptoKeyReaderPtr
+// cryptoKeyReader);
+//
+// ConsumerCryptoFailureAction getCryptoFailureAction()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoFailureAction(ConsumerCryptoFailureAction
+// action);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h 
b/pulsar-client-cpp/include/pulsar/c/message.h
new file mode 100644
index 0000000..9645b02
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message pulsar_message_t;
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+pulsar_message_t *pulsar_message_create();
+void pulsar_message_free(pulsar_message_t *message);
+
+/// Builder
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data, 
size_t size);
+
+/**
+ * Set content of the message to a buffer already allocated by the caller. No 
copies of
+ * this buffer will be made. The caller is responsible to ensure the memory 
buffer is
+ * valid until the message has been persisted (or an error is returned).
+ */
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void 
*data, size_t size);
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name, 
const char *value);
+
+/*
+ * set partition key for the message routing
+ * @param hash of this key is used to determine message's topic partition
+ */
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char 
*partitionKey);
+
+/**
+ * Set the event timestamp for the message.
+ */
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t 
eventTimestamp);
+
+/**
+ * Specify a custom sequence id for the message being published.
+ * <p>
+ * The sequence id can be used for deduplication purposes and it needs to 
follow these rules:
+ * <ol>
+ * <li><code>sequenceId >= 0</code>
+ * <li>Sequence id for a message needs to be greater than sequence id for 
earlier messages:
+ * <code>sequenceId(N+1) > sequenceId(N)</code>
+ * <li>It's not necessary for sequence ids to be consecutive. There can be 
holes between messages. Eg. the
+ * <code>sequenceId</code> could represent an offset or a cumulative size.
+ * </ol>
+ *
+ * @param sequenceId
+ *            the sequence id to assign to the current message
+ */
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t 
sequenceId);
+
+/**
+ * override namespace replication clusters.  note that it is the
+ * caller's responsibility to provide valid cluster names, and that
+ * all clusters have been previously configured as topics.
+ *
+ * given an empty list, the message will replicate per the namespace
+ * configuration.
+ *
+ * @param clusters where to send this message.
+ */
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const 
char **clusters);
+
+/**
+ * Do not replicate this message
+ * @param flag if true, disable replication, otherwise use default
+ * replication
+ */
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
+
+/// Accessor for built messages
+
+/**
+ * Return the properties attached to the message.
+ * Properties are application defined key/value pairs that will be attached to 
the message
+ *
+ * @return an unmodifiable view of the properties map
+ */
+// const StringMap& getProperties() const;
+
+/**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+int pulsar_message_has_property(pulsar_message_t *message, const char *name);
+
+/**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+const char *pulsar_message_get_property(pulsar_message_t *message, const char 
*name);
+
+/**
+ * Get the content of the message
+ *
+ *
+ * @return the pointer to the message payload
+ */
+const void *pulsar_message_get_data(pulsar_message_t *message);
+
+/**
+ * Get the length of the message
+ *
+ * @return the length of the message payload
+ */
+uint32_t pulsar_message_get_length(pulsar_message_t *message);
+
+/**
+ * Get the unique message ID associated with this message.
+ *
+ * The message id can be used to univocally refer to a message without having 
to keep the entire payload
+ * in memory.
+ *
+ * Only messages received from the consumer will have a message id assigned.
+ *
+ */
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message);
+
+/**
+ * Get the partition key for this message
+ * @return key string that is hashed to determine message's topic partition
+ */
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message);
+int pulsar_message_has_partition_key(pulsar_message_t *message);
+
+/**
+ * Get the UTC based timestamp in milliseconds referring to when the message 
was published by the client
+ * producer
+ */
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message);
+
+/**
+ * Get the event timestamp associated with this message. It is set by the 
client producer.
+ */
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h 
b/pulsar-client-cpp/include/pulsar/c/message_id.h
new file mode 100644
index 0000000..a0eb684
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+/**
+ * MessageId representing the "earliest" or "oldest available" message stored 
in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_earliest();
+
+/**
+ * MessageId representing the "latest" or "last published" message in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_latest();
+
+/**
+ * Serialize the message id into a binary string for storing
+ */
+const void *pulsar_message_id_serialize(int *len);
+
+/**
+ * Deserialize a message id from a binary string
+ */
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, 
uint32_t len);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h 
b/pulsar-client-cpp/include/pulsar/c/message_router.h
new file mode 100644
index 0000000..aea4188
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
+
+typedef int (*pulsar_message_router)(pulsar_message_t *msg, 
pulsar_topic_metadata_t *topicMetadata);
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t 
*topicMetadata);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h 
b/pulsar-client-cpp/include/pulsar/c/producer.h
new file mode 100644
index 0000000..6121e06
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
+typedef void (*pulsar_close_callback)(pulsar_result);
+
+/**
+ * @return the topic to which producer is publishing to
+ */
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer);
+
+/**
+ * @return the producer name which could have been assigned by the system or 
specified by the client
+ */
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer);
+
+/**
+ * Publish a message on the topic associated with this Producer.
+ *
+ * This method will block until the message will be accepted and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the 
sendTimeout,
+ * an error will be returned.
+ *
+ * This method is equivalent to asyncSend() and wait until the callback is 
triggered.
+ *
+ * @param msg message to publish
+ * @return ResultOk if the message was published successfully
+ * @return ResultWriteError if it wasn't possible to publish the message
+ */
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer, 
pulsar_message_t *msg);
+
+/**
+ * Asynchronously publish a message on the topic associated with this Producer.
+ *
+ * This method will initiate the publish operation and return immediately. The
+ * provided callback will be triggered when the message has been be accepted 
and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the 
sendTimeout, the
+ * callback will be triggered with a Result::WriteError code.
+ *
+ * @param msg message to publish
+ * @param callback the callback to get notification of the completion
+ */
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t 
*msg,
+                                pulsar_send_callback callback);
+
+/**
+ * Get the last sequence id that was published by this producer.
+ *
+ * This represent either the automatically assigned or custom sequence id (set 
on the MessageBuilder) that
+ * was published and acknowledged by the broker.
+ *
+ * After recreating a producer with the same producer name, this will return 
the last message that was
+ * published in
+ * the previous producer session, or -1 if there no message was ever published.
+ *
+ * @return the last sequence id published by this producer
+ */
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until
+ * all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ *
+ * @return an error code to indicate the success or failure
+ */
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. The provided callback 
will be
+ * triggered when all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ */
+void pulsar_producer_close_async(pulsar_producer_t *producer, 
pulsar_close_callback callback);
+
+void pulsar_producer_free(pulsar_producer_t *producer);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
new file mode 100644
index 0000000..534c411
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include <pulsar/c/message_router.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+    pulsar_UseSinglePartition,
+    pulsar_RoundRobinDistribution,
+    pulsar_CustomPartition
+} pulsar_partitions_routing_mode;
+
+typedef enum { pulsar_Murmur3_32Hash, pulsar_BoostHash, pulsar_JavaStringHash 
} pulsar_hashing_scheme;
+
+typedef enum {
+    pulsar_CompressionNone = 0,
+    pulsar_CompressionLZ4 = 1,
+    pulsar_CompressionZLib = 2
+} pulsar_compression_type;
+
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create();
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf);
+
+void 
pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t 
*conf,
+                                                     const char *producerName);
+
+const char 
*pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t
 *conf);
+
+void 
pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t 
*conf, int sendTimeoutMs);
+
+int 
pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t 
*conf);
+
+void 
pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t
 *conf,
+                                                           int64_t 
initialSequenceId);
+
+int64_t 
pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t
 *conf);
+
+void 
pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t
 *conf,
+                                                        
pulsar_compression_type compressionType);
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+    pulsar_producer_configuration_t *conf);
+
+void 
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
 *conf,
+                                                            int 
maxPendingMessages);
+
+int 
pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t
 *conf);
+
+/**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each 
partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured 
value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf, int 
maxPendingMessagesAcrossPartitions);
+
+/**
+ *
+ * @return the maximum number of pending messages allowed across all the 
partitions
+ */
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf);
+
+void 
pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t
 *conf,
+                                                               
pulsar_partitions_routing_mode mode);
+
+pulsar_partitions_routing_mode 
pulsar_producer_configuration_get_partitions_routing_mode(
+    pulsar_producer_configuration_t *conf);
+
+void 
pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t
 *conf,
+                                                      pulsar_message_router 
router);
+
+void 
pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t
 *conf,
+                                                      pulsar_hashing_scheme 
scheme);
+
+pulsar_hashing_scheme 
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t
 *conf);
+
+void 
pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t
 *conf,
+                                                           int 
blockIfQueueFull);
+
+int 
pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t
 *conf);
+
+// Zero queue size feature will not be supported on consumer end if batching 
is enabled
+void 
pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t
 *conf,
+                                                        int batchingEnabled);
+
+int 
pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t
 *conf);
+
+void 
pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t
 *conf,
+                                                             unsigned int 
batchingMaxMessages);
+
+unsigned int 
pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t
 *conf);
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf, unsigned long 
batchingMaxAllowedSizeInBytes);
+
+unsigned long 
pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf);
+
+void 
pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_producer_configuration_t
 *conf,
+                                                                     unsigned 
long batchingMaxPublishDelayMs);
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader() const;
+// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr 
cryptoKeyReader);
+//
+// ProducerCryptoFailureAction getCryptoFailureAction() const;
+// ProducerConfiguration &setCryptoFailureAction(ProducerCryptoFailureAction 
action);
+//
+// std::set <std::string> &getEncryptionKeys();
+// int isEncryptionEnabled() const;
+// ProducerConfiguration &addEncryptionKey(std::string key);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h 
b/pulsar-client-cpp/include/pulsar/c/reader.h
new file mode 100644
index 0000000..648c172
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader pulsar_reader_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this reader is reading from
+ */
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader);
+
+/**
+ * Read a single message.
+ *
+ * If a message is not immediately available, this method will block until a 
new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in 
the configuration
+ */
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, 
pulsar_message_t **msg);
+
+/**
+ * Read a single message
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in 
the configuration
+ */
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, 
pulsar_message_t **msg,
+                                                   int timeoutMs);
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback 
callback);
+
+void pulsar_reader_free(pulsar_reader_t *reader);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
new file mode 100644
index 0000000..914bbd9
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
+
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, 
pulsar_message_t *msg);
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create();
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t 
*configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * messages. A listener will be called in order for every message received.
+ */
+void 
pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t 
*configuration,
+                                                     pulsar_reader_listener 
listener);
+
+int 
pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t 
*configuration);
+
+/**
+ * Sets the size of the reader receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by 
the Consumer before the
+ * application calls receive(). Using a higher value could potentially 
increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the 
consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared 
subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout 
nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should 
not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ *            the new receiver queue size value
+ */
+void 
pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t
 *configuration,
+                                                         int size);
+
+int 
pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t
 *configuration);
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t 
*configuration,
+                                                 const char *readerName);
+
+const char 
*pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t 
*configuration);
+
+void 
pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t
 *configuration,
+                                                              const char 
*subscriptionRolePrefix);
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+    pulsar_reader_configuration_t *configuration);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/result.h 
b/pulsar-client-cpp/include/pulsar/c/result.h
new file mode 100644
index 0000000..0ca769d
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/result.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+    pulsar_result_Ok,  /// Operation successful
+
+    pulsar_result_UnknownError,  /// Unknown error happened on broker
+
+    pulsar_result_InvalidConfiguration,  /// Invalid configuration
+
+    pulsar_result_Timeout,       /// Operation timed out
+    pulsar_result_LookupError,   /// Broker lookup failed
+    pulsar_result_ConnectError,  /// Failed to connect to broker
+    pulsar_result_ReadError,     /// Failed to read from socket
+
+    pulsar_result_AuthenticationError,             /// Authentication failed 
on broker
+    pulsar_result_AuthorizationError,              /// Client is not 
authorized to create producer/consumer
+    pulsar_result_ErrorGettingAuthenticationData,  /// Client cannot find 
authorization data
+
+    pulsar_result_BrokerMetadataError,     /// Broker failed in updating 
metadata
+    pulsar_result_BrokerPersistenceError,  /// Broker failed to persist entry
+    pulsar_result_ChecksumError,           /// Corrupt message checksum failure
+
+    pulsar_result_ConsumerBusy,   /// Exclusive consumer is already connected
+    pulsar_result_NotConnected,   /// Producer/Consumer is not currently 
connected to broker
+    pulsar_result_AlreadyClosed,  /// Producer/Consumer is already closed and 
not accepting any operation
+
+    pulsar_result_InvalidMessage,  /// Error in publishing an already used 
message
+
+    pulsar_result_ConsumerNotInitialized,         /// Consumer is not 
initialized
+    pulsar_result_ProducerNotInitialized,         /// Producer is not 
initialized
+    pulsar_result_TooManyLookupRequestException,  /// Too Many concurrent 
LookupRequest
+
+    pulsar_result_InvalidTopicName,  /// Invalid topic name
+    pulsar_result_InvalidUrl,        /// Client Initialized with Invalid 
Broker Url (VIP Url passed to Client
+                                     /// Constructor)
+    pulsar_result_ServiceUnitNotReady,  /// Service Unit unloaded between 
client did lookup and
+                                        /// producer/consumer got created
+
+    pulsar_result_OperationNotSupported,
+    pulsar_result_ProducerBlockedQuotaExceededError,      /// Producer is 
blocked
+    pulsar_result_ProducerBlockedQuotaExceededException,  /// Producer is 
getting exception
+    pulsar_result_ProducerQueueIsFull,                    /// Producer queue 
is full
+    pulsar_result_MessageTooBig,                          /// Trying to send a 
messages exceeding the max size
+    pulsar_result_TopicNotFound,                          /// Topic not found
+    pulsar_result_SubscriptionNotFound,                   /// Subscription not 
found
+    pulsar_result_ConsumerNotFound,                       /// Consumer not 
found
+    pulsar_result_UnsupportedVersionError,  /// Error when an older 
client/version doesn't support a required
+                                            /// feature
+    pulsar_result_TopicTerminated,          /// Topic was already terminated
+    pulsar_result_CryptoError               /// Error when crypto operation 
fails
+} pulsar_result;
+
+// Return string representation of result code
+const char *pulsar_result_str(pulsar_result result);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/lib/CMakeLists.txt 
b/pulsar-client-cpp/lib/CMakeLists.txt
index 052de8b..74dbc08 100644
--- a/pulsar-client-cpp/lib/CMakeLists.txt
+++ b/pulsar-client-cpp/lib/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc)
+file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc c/*.cc)
 
 execute_process(COMMAND cat ../pom.xml COMMAND xmllint --format - COMMAND sed 
"s/xmlns=\".*\"//g" COMMAND xmllint --stream --pattern /project/version --debug 
- COMMAND grep -A 2 "matches pattern" COMMAND grep text COMMAND sed "s/.* [0-9] 
//g" OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE PV)
 set (CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -D_PULSAR_VERSION_=\\\"${PV}\\\"")
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc 
b/pulsar-client-cpp/lib/c/c_Authentication.cc
new file mode 100644
index 0000000..5cd2a64
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/authentication.h>
+
+#include <pulsar/Authentication.h>
+
+#include "c_structs.h"
+
+pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
+                                                      const char 
*authParamsString) {
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = pulsar::AuthFactory::create(dynamicLibPath, 
authParamsString);
+    return authentication;
+}
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication) { 
delete authentication; }
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc 
b/pulsar-client-cpp/lib/c/c_Client.cc
new file mode 100644
index 0000000..cec7a13
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <boost/bind.hpp>
+
+#include "c_structs.h"
+
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+                                      const pulsar_client_configuration_t 
*clientConfiguration) {
+    pulsar_client_t *c_client = new pulsar_client_t;
+    c_client->client.reset(new pulsar::Client(std::string(serviceUrl)));
+    return c_client;
+}
+
+void pulsar_client_free(pulsar_client_t *client) { delete client; }
+
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const 
char *topic,
+                                            const 
pulsar_producer_configuration_t *conf,
+                                            pulsar_producer_t **c_producer) {
+    pulsar::Producer producer;
+    pulsar::Result res = client->client->createProducer(topic, conf->conf, 
producer);
+    if (res == pulsar::ResultOk) {
+        (*c_producer) = new pulsar_producer_t;
+        (*c_producer)->producer = producer;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_create_producer_callback(pulsar::Result result, 
pulsar::Producer producer,
+                                            pulsar_create_producer_callback 
callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_producer_t *c_producer = new pulsar_producer_t;
+        c_producer->producer = producer;
+        callback(pulsar_result_Ok, c_producer);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char 
*topic,
+                                         const pulsar_producer_configuration_t 
*conf,
+                                         pulsar_create_producer_callback 
callback) {
+    client->client->createProducerAsync(topic, conf->conf,
+                                        
boost::bind(&handle_create_producer_callback, _1, _2, callback));
+}
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char 
*topic,
+                                      const char *subscriptionName,
+                                      const pulsar_consumer_configuration_t 
*conf,
+                                      pulsar_consumer_t **c_consumer) {
+    pulsar::Consumer consumer;
+    pulsar::Result res =
+        client->client->subscribe(topic, subscriptionName, 
conf->consumerConfiguration, consumer);
+    if (res == pulsar::ResultOk) {
+        (*c_consumer) = new pulsar_consumer_t;
+        (*c_consumer)->consumer = consumer;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer 
consumer,
+                                      pulsar_subscribe_callback callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
+        c_consumer->consumer = consumer;
+        callback(pulsar_result_Ok, c_consumer);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, 
const char *subscriptionName,
+                                   const pulsar_consumer_configuration_t 
*conf, pulsar_consumer_t **consumer,
+                                   pulsar_subscribe_callback callback) {
+    client->client->subscribeAsync(topic, subscriptionName, 
conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, 
_2, callback));
+}
+
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char 
*topic,
+                                          const pulsar_message_id_t 
*startMessageId,
+                                          pulsar_reader_configuration_t *conf, 
pulsar_reader_t **c_reader) {
+    pulsar::Reader reader;
+    pulsar::Result res = client->client->createReader(topic, 
startMessageId->messageId, conf->conf, reader);
+    if (res == pulsar::ResultOk) {
+        (*c_reader) = new pulsar_reader_t;
+        (*c_reader)->reader = reader;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_reader_callback(pulsar::Result result, pulsar::Reader 
reader,
+                                   pulsar_reader_callback callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_reader_t *c_reader = new pulsar_reader_t;
+        c_reader->reader = reader;
+        callback(pulsar_result_Ok, c_reader);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char 
*topic,
+                                       const pulsar_message_id_t 
*startMessageId,
+                                       pulsar_reader_configuration_t *conf, 
pulsar_reader_t **reader,
+                                       pulsar_reader_callback callback) {
+    client->client->createReaderAsync(topic, startMessageId->messageId, 
conf->conf,
+                                      boost::bind(&handle_reader_callback, _1, 
_2, callback));
+}
+
+pulsar_result pulsar_client_close(pulsar_client_t *client) { return 
(pulsar_result)client->client->close(); }
+
+static void handle_client_close(pulsar::Result result, pulsar_close_callback 
callback) {
+    callback((pulsar_result)result);
+}
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback 
callback) {
+    client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
new file mode 100644
index 0000000..8caf8e8
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_client_configuration_t *pulsar_client_configuration_create() {
+    pulsar_client_configuration_t *c_conf = new pulsar_client_configuration_t;
+    c_conf->conf = pulsar::ClientConfiguration();
+    return c_conf;
+}
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf) { 
delete conf; }
+
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+                                          pulsar_authentication_t 
*authentication) {
+    conf->conf.setAuth(authentication->auth);
+}
+
+void 
pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t
 *conf,
+                                                               int timeout) {
+    conf->conf.setOperationTimeoutSeconds(timeout);
+}
+
+int 
pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.getOperationTimeoutSeconds();
+}
+
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t 
*conf, int threads) {
+    conf->conf.setIOThreads(threads);
+}
+
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t 
*conf) {
+    return conf->conf.getIOThreads();
+}
+
+void 
pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t
 *conf,
+                                                              int threads) {
+    conf->conf.setMessageListenerThreads(threads);
+}
+
+int 
pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.getMessageListenerThreads();
+}
+
+void 
pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t
 *conf,
+                                                               int 
concurrentLookupRequest) {
+    conf->conf.setConcurrentLookupRequest(concurrentLookupRequest);
+}
+
+int 
pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.getConcurrentLookupRequest();
+}
+
+void 
pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t
 *conf,
+                                                        const char 
*logConfFilePath) {
+    conf->conf.setLogConfFilePath(logConfFilePath);
+}
+
+const char 
*pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.getLogConfFilePath().c_str();
+}
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t 
*conf, int useTls) {
+    conf->conf.setUseTls(useTls);
+}
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t 
*conf) {
+    return conf->conf.isUseTls();
+}
+
+void 
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t
 *conf,
+                                                               const char 
*tlsTrustCertsFilePath) {
+    conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+}
+
+const char 
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.getTlsTrustCertsFilePath().c_str();
+}
+
+void 
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
 *conf,
+                                                                   int 
allowInsecure) {
+    conf->conf.setTlsAllowInsecureConnection(allowInsecure);
+}
+
+int 
pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t
 *conf) {
+    return conf->conf.isTlsAllowInsecureConnection();
+}
+
+void 
pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t
 *conf,
+                                                               const unsigned 
int interval) {
+    conf->conf.setStatsIntervalInSeconds(interval);
+}
+
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+    pulsar_client_configuration_t *conf) {
+    return conf->conf.getStatsIntervalInSeconds();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc 
b/pulsar-client-cpp/lib/c/c_Consumer.cc
new file mode 100644
index 0000000..22021a5
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer) {
+    return consumer->consumer.getTopic().c_str();
+}
+
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer) 
{
+    return consumer->consumer.getSubscriptionName().c_str();
+}
+
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.unsubscribe();
+}
+
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, 
pulsar_result_callback callback) {
+    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, 
_1, callback));
+}
+
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, 
pulsar_message_t **msg) {
+    pulsar::Message message;
+    pulsar::Result res = consumer->consumer.receive(message);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t 
*consumer, pulsar_message_t **msg,
+                                                   int timeoutMs) {
+    pulsar::Message message;
+    pulsar::Result res = consumer->consumer.receive(message, timeoutMs);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer, 
pulsar_message_t *message) {
+    return (pulsar_result)consumer->consumer.acknowledge(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId) {
+    return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, 
pulsar_message_t *message,
+                                       pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeAsync(message->message, 
boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, 
pulsar_message_id_t *messageId,
+                                          pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeAsync(messageId->messageId,
+                                        boost::bind(handle_result_callback, 
_1, callback));
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t 
*consumer, pulsar_message_t *message) {
+    return 
(pulsar_result)consumer->consumer.acknowledgeCumulative(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t 
*consumer,
+                                                        pulsar_message_id_t 
*messageId) {
+    return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, 
pulsar_message_t *message,
+                                                  pulsar_result_callback 
callback) {
+    consumer->consumer.acknowledgeCumulativeAsync(message->message,
+                                                  
boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t 
*consumer,
+                                                     pulsar_message_id_t 
*messageId,
+                                                     pulsar_result_callback 
callback) {
+    consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
+                                                  
boost::bind(handle_result_callback, _1, callback));
+}
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.close();
+}
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, 
pulsar_result_callback callback) {
+    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, 
callback));
+}
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
+
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t 
*consumer) {
+    return (pulsar_result)consumer->consumer.pauseMessageListener();
+}
+
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.resumeMessageListener();
+}
+
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+    return consumer->consumer.redeliverUnacknowledgedMessages();
+}
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
new file mode 100644
index 0000000..dcd0aac
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/consumer_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create() {
+    return new pulsar_consumer_configuration_t;
+}
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t 
*consumer_configuration) {
+    delete consumer_configuration;
+}
+
+void 
pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                     pulsar_consumer_type 
consumerType) {
+    
consumer_configuration->consumerConfiguration.setConsumerType((pulsar::ConsumerType)consumerType);
+}
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return 
(pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
+}
+
+static void message_listener_callback(pulsar::Consumer consumer, const 
pulsar::Message &msg,
+                                      pulsar_message_listener listener) {
+    pulsar_consumer_t c_consumer;
+    c_consumer.consumer = consumer;
+    pulsar_message_t *message = new pulsar_message_t;
+    message->message = msg;
+    listener(&c_consumer, message);
+}
+
+void pulsar_consumer_configuration_set_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration, 
pulsar_message_listener messageListener) {
+    consumer_configuration->consumerConfiguration.setMessageListener(
+        boost::bind(message_listener_callback, _1, _2, messageListener));
+}
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t 
*consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.hasMessageListener();
+}
+
+void pulsar_consumer_configuration_set_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration, int size) {
+    consumer_configuration->consumerConfiguration.setReceiverQueueSize(size);
+}
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return 
consumer_configuration->consumerConfiguration.getReceiverQueueSize();
+}
+
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration, int 
maxTotalReceiverQueueSizeAcrossPartitions) {
+    
consumer_configuration->consumerConfiguration.setMaxTotalReceiverQueueSizeAcrossPartitions(
+        maxTotalReceiverQueueSizeAcrossPartitions);
+}
+
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return 
consumer_configuration->consumerConfiguration.getMaxTotalReceiverQueueSizeAcrossPartitions();
+}
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                       const char *consumerName) {
+    
consumer_configuration->consumerConfiguration.setConsumerName(consumerName);
+}
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t 
*consumer_configuration) {
+    return 
consumer_configuration->consumerConfiguration.getConsumerName().c_str();
+}
+
+void 
pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t 
*consumer_configuration,
+                                                     const uint64_t 
milliSeconds) {
+    
consumer_configuration->consumerConfiguration.setUnAckedMessagesTimeoutMs(milliSeconds);
+}
+
+long pulsar_consumer_get_unacked_messages_timeout_ms(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return 
consumer_configuration->consumerConfiguration.getUnAckedMessagesTimeoutMs();
+}
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t 
*consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.isEncryptionEnabled();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc 
b/pulsar-client-cpp/lib/c/c_Message.cc
new file mode 100644
index 0000000..d87560e
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message.h>
+#include "c_structs.h"
+
+pulsar_message_t *pulsar_message_create() { return new pulsar_message_t; }
+
+void pulsar_message_free(pulsar_message_t *message) { delete message; }
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data, 
size_t size) {
+    message->builder.setContent(data, size);
+}
+
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void 
*data, size_t size) {
+    message->builder.setAllocatedContent(data, size);
+}
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name, 
const char *value) {
+    message->builder.setProperty(name, value);
+}
+
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char 
*partitionKey) {
+    message->builder.setPartitionKey(partitionKey);
+}
+
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t 
eventTimestamp) {
+    message->builder.setEventTimestamp(eventTimestamp);
+}
+
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t 
sequenceId) {
+    message->builder.setSequenceId(sequenceId);
+}
+
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const 
char **clusters) {
+    const char *c = clusters[0];
+    std::vector<std::string> clustersList;
+    while (c) {
+        clustersList.push_back(c);
+        ++c;
+    }
+
+    message->builder.setReplicationClusters(clustersList);
+}
+
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
+    message->builder.disableReplication(flag);
+}
+
+int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
+    return message->message.hasProperty(name);
+}
+
+const char *pulsar_message_get_property(pulsar_message_t *message, const char 
*name) {
+    return message->message.getProperty(name).c_str();
+}
+
+const void *pulsar_message_get_data(pulsar_message_t *message) { return 
message->message.getData(); }
+
+uint32_t pulsar_message_get_length(pulsar_message_t *message) { return 
message->message.getLength(); }
+
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message) {
+    pulsar_message_id_t *messageId = new pulsar_message_id_t;
+    messageId->messageId = message->message.getMessageId();
+    return messageId;
+}
+
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message) {
+    return message->message.getPartitionKey().c_str();
+}
+
+int pulsar_message_has_partition_key(pulsar_message_t *message) { return 
message->message.hasPartitionKey(); }
+
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
+    return message->message.getPublishTimestamp();
+}
+
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message) {
+    return message->message.getEventTimestamp();
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc 
b/pulsar-client-cpp/lib/c/c_MessageId.cc
new file mode 100644
index 0000000..5728359
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_id.h>
+#include "c_structs.h"
+
+#include <boost/thread/once.hpp>
+
+boost::once_flag initialized = BOOST_ONCE_INIT;
+
+static pulsar_message_id_t earliest;
+static pulsar_message_id_t latest;
+
+static void initialize() {
+    earliest.messageId = pulsar::MessageId::earliest();
+    latest.messageId = pulsar::MessageId::latest();
+}
+
+const pulsar_message_id_t *pulsar_message_id_earliest() {
+    boost::call_once(&initialize, initialized);
+    return &earliest;
+}
+
+const pulsar_message_id_t *pulsar_message_id_latest() {
+    boost::call_once(&initialize, initialized);
+    return &latest;
+}
+
+const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int 
*len) {
+    std::string str;
+    messageId->messageId.serialize(str);
+    void *p = malloc(str.length());
+    memcpy(p, str.c_str(), str.length());
+    return p;
+}
+
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, 
uint32_t len) {
+    std::string strId((const char *)buffer, len);
+    pulsar_message_id_t *messageId = new pulsar_message_id_t;
+    messageId->messageId = pulsar::MessageId::deserialize(strId);
+    return messageId;
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageRouter.cc 
b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
new file mode 100644
index 0000000..3184357
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_router.h>
+
+#include "c_structs.h"
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t 
*topicMetadata) {
+    return topicMetadata->metadata->getNumPartitions();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc 
b/pulsar-client-cpp/lib/c/c_Producer.cc
new file mode 100644
index 0000000..1de670c
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/bind.hpp>
+
+#include <pulsar/c/producer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer) {
+    return producer->producer.getTopic().c_str();
+}
+
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer) {
+    return producer->producer.getProducerName().c_str();
+}
+
+void pulsar_producer_free(pulsar_producer_t *producer) { delete producer; }
+
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer, 
pulsar_message_t *msg) {
+    msg->message = msg->builder.build();
+    return (pulsar_result)producer->producer.send(msg->message);
+}
+
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
+                                 pulsar_send_callback callback) {
+    callback((pulsar_result)result, msg);
+}
+
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t 
*msg,
+                                pulsar_send_callback callback) {
+    msg->message = msg->builder.build();
+    producer->producer.sendAsync(msg->message, 
boost::bind(&handle_producer_send, _1, msg, callback));
+}
+
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
+    return producer->producer.getLastSequenceId();
+}
+
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
+    return (pulsar_result)producer->producer.close();
+}
+
+void pulsar_producer_close_async(pulsar_producer_t *producer, 
pulsar_close_callback callback) {
+    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, 
callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
new file mode 100644
index 0000000..8cc9345
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/c/producer_configuration.h>
+#include <include/pulsar/c/message.h>
+
+#include "c_structs.h"
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create() {
+    pulsar_producer_configuration_t *c_conf = new 
pulsar_producer_configuration_t;
+    c_conf->conf = pulsar::ProducerConfiguration();
+    return c_conf;
+}
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf) 
{ delete conf; }
+
+void 
pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t 
*conf,
+                                                     const char *producerName) 
{
+    conf->conf.setProducerName(producerName);
+}
+
+const char 
*pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getProducerName().c_str();
+}
+
+void 
pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t 
*conf,
+                                                    int sendTimeoutMs) {
+    conf->conf.setSendTimeout(sendTimeoutMs);
+}
+
+int 
pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t 
*conf) {
+    return conf->conf.getSendTimeout();
+}
+
+void 
pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t
 *conf,
+                                                           int64_t 
initialSequenceId) {
+    conf->conf.setInitialSequenceId(initialSequenceId);
+}
+
+int64_t 
pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getInitialSequenceId();
+}
+
+void 
pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t
 *conf,
+                                                        
pulsar_compression_type compressionType) {
+    conf->conf.setCompressionType((pulsar::CompressionType)compressionType);
+}
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+    pulsar_producer_configuration_t *conf) {
+    return (pulsar_compression_type)conf->conf.getCompressionType();
+}
+
+void 
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
 *conf,
+                                                            int 
maxPendingMessages) {
+    conf->conf.setMaxPendingMessages(maxPendingMessages);
+}
+
+int 
pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getMaxPendingMessages();
+}
+
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf, int 
maxPendingMessagesAcrossPartitions) {
+    
conf->conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+}
+
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getMaxPendingMessagesAcrossPartitions();
+}
+
+void 
pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t
 *conf,
+                                                               
pulsar_partitions_routing_mode mode) {
+    
conf->conf.setPartitionsRoutingMode((pulsar::ProducerConfiguration::PartitionsRoutingMode)mode);
+}
+
+pulsar_partitions_routing_mode 
pulsar_producer_configuration_get_partitions_routing_mode(
+    pulsar_producer_configuration_t *conf) {
+    return 
(pulsar_partitions_routing_mode)conf->conf.getPartitionsRoutingMode();
+}
+
+void 
pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t
 *conf,
+                                                      pulsar_hashing_scheme 
scheme) {
+    
conf->conf.setHashingScheme((pulsar::ProducerConfiguration::HashingScheme)scheme);
+}
+
+pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
+    pulsar_producer_configuration_t *conf) {
+    return (pulsar_hashing_scheme)conf->conf.getHashingScheme();
+}
+
+class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
+    pulsar_message_router _router;
+
+   public:
+    MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+
+    int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata 
&topicMetadata) {
+        pulsar_message_t message;
+        message.message = msg;
+
+        pulsar_topic_metadata_t metadata;
+        metadata.metadata = &topicMetadata;
+
+        return _router(&message, &metadata);
+    }
+};
+
+void 
pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t
 *conf,
+                                                      pulsar_message_router 
router) {
+    
conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+}
+
+void 
pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t
 *conf,
+                                                           int 
blockIfQueueFull) {
+    conf->conf.setBlockIfQueueFull(blockIfQueueFull);
+}
+
+int 
pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getBlockIfQueueFull();
+}
+
+void 
pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t
 *conf,
+                                                        int batchingEnabled) {
+    conf->conf.setBatchingEnabled(batchingEnabled);
+}
+
+int 
pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getBatchingEnabled();
+}
+
+void 
pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t
 *conf,
+                                                             unsigned int 
batchingMaxMessages) {
+    conf->conf.setBatchingMaxMessages(batchingMaxMessages);
+}
+
+unsigned int 
pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t
 *conf) {
+    return conf->conf.getBatchingMaxMessages();
+}
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf, unsigned long 
batchingMaxAllowedSizeInBytes) {
+    conf->conf.setBatchingMaxAllowedSizeInBytes(batchingMaxAllowedSizeInBytes);
+}
+
+unsigned long 
pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingMaxAllowedSizeInBytes();
+}
+
+void pulsar_producer_configuration_set_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf, unsigned long 
batchingMaxPublishDelayMs) {
+    conf->conf.setBatchingMaxPublishDelayMs(batchingMaxPublishDelayMs);
+}
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingMaxPublishDelayMs();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc 
b/pulsar-client-cpp/lib/c/c_Reader.cc
new file mode 100644
index 0000000..bb5d69b
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader) { return 
reader->reader.getTopic().c_str(); }
+
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, 
pulsar_message_t **msg) {
+    pulsar::Message message;
+    pulsar::Result res = reader->reader.readNext(message);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, 
pulsar_message_t **msg,
+                                                   int timeoutMs) {
+    pulsar::Message message;
+    pulsar::Result res = reader->reader.readNext(message, timeoutMs);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return 
(pulsar_result)reader->reader.close(); }
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback 
callback) {
+    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, 
callback));
+}
+
+void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc 
b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
new file mode 100644
index 0000000..b6419a6
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/ReaderConfiguration.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
+    return new pulsar_reader_configuration_t;
+}
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t 
*configuration) { delete configuration; }
+
+static void message_listener_callback(pulsar::Reader reader, const 
pulsar::Message &msg,
+                                      pulsar_reader_listener listener) {
+    pulsar_reader_t c_reader;
+    c_reader.reader = reader;
+    pulsar_message_t *message = new pulsar_message_t;
+    message->message = msg;
+    listener(&c_reader, message);
+}
+
+void 
pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t 
*configuration,
+                                                     pulsar_reader_listener 
listener) {
+    
configuration->conf.setReaderListener(boost::bind(message_listener_callback, 
_1, _2, listener));
+}
+
+int 
pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t 
*configuration) {
+    return configuration->conf.hasReaderListener();
+}
+
+void 
pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t
 *configuration,
+                                                         int size) {
+    configuration->conf.setReceiverQueueSize(size);
+}
+
+int 
pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t
 *configuration) {
+    return configuration->conf.getReceiverQueueSize();
+}
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t 
*configuration,
+                                                 const char *readerName) {
+    configuration->conf.setReaderName(readerName);
+}
+
+const char 
*pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t 
*configuration) {
+    return configuration->conf.getReaderName().c_str();
+}
+
+void 
pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t
 *configuration,
+                                                              const char 
*subscriptionRolePrefix) {
+    configuration->conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+}
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+    pulsar_reader_configuration_t *configuration) {
+    return configuration->conf.getSubscriptionRolePrefix().c_str();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Result.cc 
b/pulsar-client-cpp/lib/c/c_Result.cc
new file mode 100644
index 0000000..157a91f
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Result.cc
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/result.h>
+#include <pulsar/Result.h>
+
+const char *pulsar_result_str(pulsar_result result) { return 
pulsar::strResult((pulsar::Result)result); }
diff --git a/pulsar-client-cpp/lib/c/c_structs.h 
b/pulsar-client-cpp/lib/c/c_structs.h
new file mode 100644
index 0000000..b207ab4
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/Client.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+struct _pulsar_client {
+    boost::scoped_ptr<pulsar::Client> client;
+};
+
+struct _pulsar_client_configuration {
+    pulsar::ClientConfiguration conf;
+};
+
+struct _pulsar_producer {
+    pulsar::Producer producer;
+};
+
+struct _pulsar_producer_configuration {
+    pulsar::ProducerConfiguration conf;
+};
+
+struct _pulsar_consumer {
+    pulsar::Consumer consumer;
+};
+
+struct _pulsar_consumer_configuration {
+    pulsar::ConsumerConfiguration consumerConfiguration;
+};
+
+struct _pulsar_reader {
+    pulsar::Reader reader;
+};
+
+struct _pulsar_reader_configuration {
+    pulsar::ReaderConfiguration conf;
+};
+
+struct _pulsar_message {
+    pulsar::MessageBuilder builder;
+    pulsar::Message message;
+};
+
+struct _pulsar_message_id {
+    pulsar::MessageId messageId;
+};
+
+struct _pulsar_authentication {
+    pulsar::AuthenticationPtr auth;
+};
+
+struct _pulsar_topic_metadata {
+    const pulsar::TopicMetadata* metadata;
+};
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+static void handle_result_callback(pulsar::Result result, 
pulsar_result_callback callback) {
+    callback((pulsar_result)result);
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to