This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab1e28 C API wrapper based on C++ client library (#1736)
1ab1e28 is described below
commit 1ab1e28c2c419b50ca7109073904510f8aa0b641
Author: Matteo Merli <[email protected]>
AuthorDate: Sun May 6 15:42:19 2018 -0700
C API wrapper based on C++ client library (#1736)
### Motivation
Added plain C API for Pulsar client.
This is implemented as a super-thin wrapper on top of existing C++ client
library. The same `libpulsar.so` will contain both implementations.
Primary use case for this API is to enable wrapping Pulsar client library
in higher level languages that have facilities to convert C APIs but not C++
code. One such example is Go language, that with `cgo` can generate bindings
for C headers.
Added some working examples for how to use Pulsar C API and I'll add unit
tests directly in Go, to cover both wrappers at once.
---
pulsar-client-cpp/.gitignore | 4 +
pulsar-client-cpp/examples/CMakeLists.txt | 24 +++
pulsar-client-cpp/examples/SampleConsumerCApi.c | 60 +++++++
.../examples/SampleConsumerListenerCApi.c | 59 +++++++
pulsar-client-cpp/examples/SampleProducerCApi.c | 63 +++++++
pulsar-client-cpp/examples/SampleReaderCApi.c | 59 +++++++
.../include/pulsar/c/authentication.h | 35 ++++
pulsar-client-cpp/include/pulsar/c/client.h | 133 ++++++++++++++
.../include/pulsar/c/client_configuration.h | 144 +++++++++++++++
pulsar-client-cpp/include/pulsar/c/consumer.h | 194 +++++++++++++++++++++
.../include/pulsar/c/consumer_configuration.h | 163 +++++++++++++++++
pulsar-client-cpp/include/pulsar/c/message.h | 168 ++++++++++++++++++
pulsar-client-cpp/include/pulsar/c/message_id.h | 53 ++++++
.../include/pulsar/c/message_router.h | 36 ++++
pulsar-client-cpp/include/pulsar/c/producer.h | 119 +++++++++++++
.../include/pulsar/c/producer_configuration.h | 147 ++++++++++++++++
pulsar-client-cpp/include/pulsar/c/reader.h | 69 ++++++++
.../include/pulsar/c/reader_configuration.h | 82 +++++++++
pulsar-client-cpp/include/pulsar/c/result.h | 81 +++++++++
pulsar-client-cpp/lib/CMakeLists.txt | 2 +-
pulsar-client-cpp/lib/c/c_Authentication.cc | 33 ++++
pulsar-client-cpp/lib/c/c_Client.cc | 142 +++++++++++++++
pulsar-client-cpp/lib/c/c_ClientConfiguration.cc | 115 ++++++++++++
pulsar-client-cpp/lib/c/c_Consumer.cc | 122 +++++++++++++
pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 104 +++++++++++
pulsar-client-cpp/lib/c/c_Message.cc | 96 ++++++++++
pulsar-client-cpp/lib/c/c_MessageId.cc | 58 ++++++
pulsar-client-cpp/lib/c/c_MessageRouter.cc | 26 +++
pulsar-client-cpp/lib/c/c_Producer.cc | 62 +++++++
pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 175 +++++++++++++++++++
pulsar-client-cpp/lib/c/c_Reader.cc | 54 ++++++
pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc | 78 +++++++++
pulsar-client-cpp/lib/c/c_Result.cc | 23 +++
pulsar-client-cpp/lib/c/c_structs.h | 80 +++++++++
34 files changed, 2862 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index c660cd1..e14b081 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -35,9 +35,13 @@ lib*.so*
# Exclude compiled executables
/examples/SampleProducer
+/examples/SampleProducerCApi
/examples/SampleConsumer
+/examples/SampleConsumerCApi
/examples/SampleAsyncProducer
/examples/SampleConsumerListener
+/examples/SampleConsumerListenerCApi
+/examples/SampleReaderCApi
/tests/main
/perf/perfProducer
/perf/perfConsumer
diff --git a/pulsar-client-cpp/examples/CMakeLists.txt
b/pulsar-client-cpp/examples/CMakeLists.txt
index fe5eb6e..6459b60 100644
--- a/pulsar-client-cpp/examples/CMakeLists.txt
+++ b/pulsar-client-cpp/examples/CMakeLists.txt
@@ -33,12 +33,36 @@ set(SAMPLE_PRODUCER_SOURCES
SampleProducer.cc
)
+set(SAMPLE_PRODUCER_C_SOURCES
+ SampleProducerCApi.c
+)
+
+set(SAMPLE_CONSUMER_C_SOURCES
+ SampleConsumerCApi.c
+)
+
+set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
+ SampleConsumerListenerCApi.c
+)
+
+set(SAMPLE_READER_C_SOURCES
+ SampleReaderCApi.c
+)
+
add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
+add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
+add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
+add_executable(SampleConsumerListenerCApi
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS})
target_link_libraries(SampleConsumer ${CLIENT_LIBS})
target_link_libraries(SampleConsumerListener ${CLIENT_LIBS})
target_link_libraries(SampleProducer ${CLIENT_LIBS})
+target_link_libraries(SampleProducerCApi ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS})
+target_link_libraries(SampleReaderCApi ${CLIENT_LIBS})
\ No newline at end of file
diff --git a/pulsar-client-cpp/examples/SampleConsumerCApi.c
b/pulsar-client-cpp/examples/SampleConsumerCApi.c
new file mode 100644
index 0000000..97b074d
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerCApi.c
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650",
conf);
+
+ pulsar_consumer_configuration_t *consumer_conf =
pulsar_consumer_configuration_create();
+ pulsar_consumer_configuration_set_consumer_type(consumer_conf,
pulsar_ConsumerShared);
+
+ pulsar_consumer_t *consumer;
+ pulsar_result res = pulsar_client_subscribe(client, "my-topic",
"my-subscrition", consumer_conf, &consumer);
+ if (res != pulsar_result_Ok) {
+ printf("Failed to create subscribe to topic: %s\n",
pulsar_result_str(res));
+ return 1;
+ }
+
+ for (;;) {
+ pulsar_message_t *message;
+ res = pulsar_consumer_receive(consumer, &message);
+ if (res != pulsar_result_Ok) {
+ printf("Failed to receive message: %s\n", pulsar_result_str(res));
+ return 1;
+ }
+
+ printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
+ pulsar_message_get_data(message));
+
+ pulsar_consumer_acknowledge(consumer, message);
+ pulsar_message_free(message);
+ }
+
+ // Cleanup
+ pulsar_consumer_close(consumer);
+ pulsar_consumer_free(consumer);
+ pulsar_consumer_configuration_free(consumer_conf);
+
+ pulsar_client_close(client);
+ pulsar_client_free(client);
+ pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
new file mode 100644
index 0000000..8f3ed0d
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t*
message) {
+ printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
+ pulsar_message_get_data(message));
+
+ pulsar_consumer_acknowledge(consumer, message);
+ pulsar_message_free(message);
+}
+
+int main() {
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650",
conf);
+
+ pulsar_consumer_configuration_t *consumer_conf =
pulsar_consumer_configuration_create();
+ pulsar_consumer_configuration_set_consumer_type(consumer_conf,
pulsar_ConsumerShared);
+ pulsar_consumer_configuration_set_message_listener(consumer_conf,
listener_callback);
+
+ pulsar_consumer_t *consumer;
+ pulsar_result res = pulsar_client_subscribe(client, "my-topic",
"my-subscrition", consumer_conf, &consumer);
+ if (res != pulsar_result_Ok) {
+ printf("Failed to create subscribe to topic: %s\n",
pulsar_result_str(res));
+ return 1;
+ }
+
+ // Block main thread
+ fgetc(stdin);
+
+ printf("\nClosing consumer\n");
+
+ // Cleanup
+ pulsar_consumer_close(consumer);
+ pulsar_consumer_free(consumer);
+ pulsar_consumer_configuration_free(consumer_conf);
+
+ pulsar_client_close(client);
+ pulsar_client_free(client);
+ pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleProducerCApi.c
b/pulsar-client-cpp/examples/SampleProducerCApi.c
new file mode 100644
index 0000000..74853e2
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleProducerCApi.c
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <stdio.h>
+#include <string.h>
+
+int main() {
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650",
conf);
+
+ pulsar_producer_configuration_t* producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_configuration_set_batching_enabled(producer_conf, 1);
+ pulsar_producer_t *producer;
+
+ pulsar_result err = pulsar_client_create_producer(client, "my-topic",
producer_conf, &producer);
+ if (err != pulsar_result_Ok) {
+ printf("Failed to create producer: %s\n", pulsar_result_str(err));
+ return 1;
+ }
+
+ for (int i = 0; i < 10; i++) {
+ const char* data = "my-content";
+ pulsar_message_t* message = pulsar_message_create();
+ pulsar_message_set_content(message, data, strlen(data));
+
+ err = pulsar_producer_send(producer, message);
+ if (err == pulsar_result_Ok) {
+ printf("Sent message %d\n", i);
+ } else {
+ printf("Failed to publish message: %s\n", pulsar_result_str(err));
+ return 1;
+ }
+
+ pulsar_message_free(message);
+ }
+
+ // Cleanup
+ pulsar_producer_close(producer);
+ pulsar_producer_free(producer);
+ pulsar_producer_configuration_free(producer_conf);
+
+ pulsar_client_close(client);
+ pulsar_client_free(client);
+ pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleReaderCApi.c
b/pulsar-client-cpp/examples/SampleReaderCApi.c
new file mode 100644
index 0000000..c0eec06
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleReaderCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+ pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+ pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650",
conf);
+
+ pulsar_reader_configuration_t *reader_conf =
pulsar_reader_configuration_create();
+
+ pulsar_reader_t *reader;
+ pulsar_result res = pulsar_client_create_reader(client, "my-topic",
pulsar_message_id_earliest(), reader_conf,
+ &reader);
+ if (res != pulsar_result_Ok) {
+ printf("Failed to create reader: %s\n", pulsar_result_str(res));
+ return 1;
+ }
+
+ for (;;) {
+ pulsar_message_t *message;
+ res = pulsar_reader_read_next(reader, &message);
+ if (res != pulsar_result_Ok) {
+ printf("Failed to read message: %s\n", pulsar_result_str(res));
+ return 1;
+ }
+
+ printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
+ pulsar_message_get_data(message));
+
+ pulsar_message_free(message);
+ }
+
+ // Cleanup
+ pulsar_reader_close(reader);
+ pulsar_reader_free(reader);
+ pulsar_reader_configuration_free(reader_conf);
+
+ pulsar_client_close(client);
+ pulsar_client_free(client);
+ pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h
b/pulsar-client-cpp/include/pulsar/c/authentication.h
new file mode 100644
index 0000000..6f405c2
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_authentication_t *pulsar_authentication_create(const char
*dynamicLibPath,
+ const char
*authParamsString);
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h
b/pulsar-client-cpp/include/pulsar/c/client.h
new file mode 100644
index 0000000..11320e7
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/client_configuration.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/message_id.h>
+#include <pulsar/c/producer.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/reader.h>
+#include <pulsar/c/consumer_configuration.h>
+#include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/c/result.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client pulsar_client_t;
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+typedef void (*pulsar_create_producer_callback)(pulsar_result result,
pulsar_producer_t *producer);
+
+typedef void (*pulsar_subscribe_callback)(pulsar_result result,
pulsar_consumer_t *consumer);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t
*reader);
+
+typedef void (*pulsar_close_callback)(pulsar_result result);
+
+/**
+ * Create a Pulsar client object connecting to the specified cluster address
and using the specified
+ * configuration.
+ *
+ * @param serviceUrl the Pulsar endpoint to use (eg:
pulsar://broker-example.com:6650)
+ * @param clientConfiguration the client configuration to use
+ */
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+ const pulsar_client_configuration_t
*clientConfiguration);
+
+/**
+ * Create a producer with default configuration
+ *
+ * @see createProducer(const std::string&, const ProducerConfiguration&,
Producer&)
+ *
+ * @param topic the topic where the new producer will publish
+ * @param producer a non-const reference where the new producer will be copied
+ * @return ResultOk if the producer has been successfully created
+ * @return ResultError if there was an error
+ */
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const
char *topic,
+ const
pulsar_producer_configuration_t *conf,
+ pulsar_producer_t **producer);
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char
*topic,
+ const pulsar_producer_configuration_t
*conf,
+ pulsar_create_producer_callback
callback);
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char
*topic,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t
*conf,
+ pulsar_consumer_t **consumer);
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic,
const char *subscriptionName,
+ const pulsar_consumer_configuration_t
*conf, pulsar_consumer_t **consumer,
+ pulsar_subscribe_callback callback);
+
+/**
+ * Create a topic reader with given {@code ReaderConfiguration} for reading
messages from the specified
+ * topic.
+ * <p>
+ * The Reader provides a low-level abstraction that allows for manual
positioning in the topic, without
+ * using a
+ * subscription. Reader can only work on non-partitioned topics.
+ * <p>
+ * The initial reader positioning is done by specifying a message id. The
options are:
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Start reading from the earliest
message available in the topic
+ * <li><code>MessageId.latest</code> : Start reading from the end topic, only
getting messages published
+ * after the
+ * reader was created
+ * <li><code>MessageId</code> : When passing a particular message id, the
reader will position itself on
+ * that
+ * specific position. The first message to be read will be the message next to
the specified messageId.
+ * </ul>
+ *
+ * @param topic
+ * The name of the topic where to read
+ * @param startMessageId
+ * The message id where the reader will position itself. The first
message returned will be the
+ * one after
+ * the specified startMessageId
+ * @param conf
+ * The {@code ReaderConfiguration} object
+ * @return The {@code Reader} object
+ */
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char
*topic,
+ const pulsar_message_id_t
*startMessageId,
+ pulsar_reader_configuration_t *conf,
pulsar_reader_t **reader);
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char
*topic,
+ const pulsar_message_id_t
*startMessageId,
+ pulsar_reader_configuration_t *conf,
pulsar_reader_t **reader,
+ pulsar_reader_callback callback);
+
+pulsar_result pulsar_client_close(pulsar_client_t *client);
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback
callback);
+
+void pulsar_client_free(pulsar_client_t *client);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client_configuration.h
b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
new file mode 100644
index 0000000..47f78db
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_client_configuration_t *pulsar_client_configuration_create();
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf);
+
+/**
+ * Set the authentication method to be used with the broker
+ *
+ * @param authentication the authentication data to use
+ */
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+ pulsar_authentication_t
*authentication);
+
+/**
+ * Set timeout on client operations (subscribe, create producer, close,
unsubscribe)
+ * Default is 30 seconds.
+ *
+ * @param timeout the timeout after which the operation will be considered as
failed
+ */
+void
pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t
*conf,
+ int timeout);
+
+/**
+ * @return the client operations timeout in seconds
+ */
+int
pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t
*conf);
+
+/**
+ * Set the number of IO threads to be used by the Pulsar client. Default is 1
+ * thread.
+ *
+ * @param threads number of threads
+ */
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t
*conf, int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t
*conf);
+
+/**
+ * Set the number of threads to be used by the Pulsar client when delivering
messages
+ * through message listener. Default is 1 thread per Pulsar client.
+ *
+ * If using more than 1 thread, messages for distinct MessageListener will be
+ * delivered in different threads, however a single MessageListener will always
+ * be assigned to the same thread.
+ *
+ * @param threads number of threads
+ */
+void
pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t
*conf,
+ int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int
pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t
*conf);
+
+/**
+ * Number of concurrent lookup-requests allowed on each broker-connection to
prevent overload on broker.
+ * <i>(default: 50000)</i> It should be configured with higher value only in
case of it requires to
+ * produce/subscribe on
+ * thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+void
pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t
*conf,
+ int
concurrentLookupRequest);
+
+/**
+ * @return Get configured total allowed concurrent lookup-request.
+ */
+int
pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t
*conf);
+
+/**
+ * Initialize the log configuration
+ *
+ * @param logConfFilePath path of the configuration file
+ */
+void
pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t
*conf,
+ const char
*logConfFilePath);
+
+/**
+ * Get the path of log configuration file (log4cpp)
+ */
+const char
*pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t
*conf);
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t
*conf, int useTls);
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t
*conf);
+
+void
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t
*conf,
+ const char
*tlsTrustCertsFilePath);
+
+const char
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
*conf);
+
+void
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
*conf,
+ int
allowInsecure);
+
+int
pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t
*conf);
+
+/*
+ * Initialize stats interval in seconds. Stats are printed and reset after
every 'statsIntervalInSeconds'.
+ * Set to 0 in order to disable stats collection.
+ */
+void
pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t
*conf,
+ const unsigned
int interval);
+
+/*
+ * Get the stats interval set in the client.
+ */
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+ pulsar_client_configuration_t *conf);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h
b/pulsar-client-cpp/include/pulsar/c/consumer.h
new file mode 100644
index 0000000..59c99e3
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_consumer pulsar_consumer_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this consumer is subscribed to
+ */
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer);
+
+/**
+ * @return the consumer name
+ */
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer);
+
+/**
+ * Unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer
is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @see asyncUnsubscribe
+ * @return Result::ResultOk if the unsubscribe operation completed successfully
+ * @return Result::ResultError if the unsubscribe operation failed
+ */
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
+
+/**
+ * Asynchronously unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer
is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @param callback the callback to get notified when the operation is complete
+ */
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer,
pulsar_result_callback callback);
+
+/**
+ * Receive a single message.
+ *
+ * If a message is not immediately available, this method will block until a
new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in
the configuration
+ */
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer,
pulsar_message_t **msg);
+
+/**
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in
the configuration
+ */
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t
*consumer, pulsar_message_t **msg,
+ int timeoutMs);
+
+/**
+ * Acknowledge the reception of a single message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the message will not be re-delivered to this consumer.
+ *
+ * @see asyncAcknowledge
+ * @param message the message to acknowledge
+ * @return ResultOk if the message was successfully acknowledged
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer,
pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId);
+
+/**
+ * Asynchronously acknowledge the reception of a single message.
+ *
+ * This method will initiate the operation and return immediately. The
provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been
acknowledged
+ */
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer,
pulsar_message_t *message,
+ pulsar_result_callback callback);
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,
+ pulsar_result_callback callback);
+
+/**
+ * Acknowledge the reception of all the messages in the stream up to (and
including)
+ * the provided message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the messages will not be re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to
ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(const Message&,
ResultCallback) and
+ * waiting for the callback to be triggered.
+ *
+ * @param message the last message in the stream to acknowledge
+ * @return ResultOk if the message was successfully acknowledged. All
previously delivered messages for
+ * this topic are also acknowledged.
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t
*consumer, pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t
*consumer,
+ pulsar_message_id_t
*messageId);
+
+/**
+ * Asynchronously acknowledge the reception of all the messages in the stream
up to (and
+ * including) the provided message.
+ *
+ * This method will initiate the operation and return immediately. The
provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been
acknowledged
+ */
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer,
pulsar_message_t *message,
+ pulsar_result_callback
callback);
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t
*consumer,
+ pulsar_message_id_t
*messageId,
+ pulsar_result_callback
callback);
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer,
pulsar_result_callback callback);
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer);
+
+/*
+ * Pause receiving messages via the messageListener, till
resumeMessageListener() is called.
+ */
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t
*consumer);
+
+/*
+ * Resume receiving the messages via the messageListener.
+ * Asynchronously receive all the messages enqueued from time
pauseMessageListener() was called.
+ */
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
+
+/**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request
is ignored if the consumer is
+ * not
+ * active for the given topic. In Shared mode, the consumers messages to be
redelivered are distributed
+ * across all
+ * the connected consumers. This is a non blocking call and doesn't throw an
exception. In case the
+ * connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
new file mode 100644
index 0000000..3bd9571
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t;
+
+typedef enum {
+ /**
+ * There can be only 1 consumer on the same topic with the same
consumerName
+ */
+ pulsar_ConsumerExclusive,
+
+ /**
+ * Multiple consumers will be able to use the same consumerName and the
messages
+ * will be dispatched according to a round-robin rotation between the
connected consumers
+ */
+ pulsar_ConsumerShared,
+
+ /** Only one consumer is active on the subscription; Subscription can have
N consumers
+ * connected one of which will get promoted to master if the current
master becomes inactive
+ */
+ pulsar_ConsumerFailover
+} pulsar_consumer_type;
+
+/// Callback definition for MessageListener
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer,
pulsar_message_t *msg);
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t
*consumer_configuration);
+
+/**
+ * Specify the consumer type. The consumer type enables
+ * specifying the type of subscription. In Exclusive subscription,
+ * only a single consumer is allowed to attach to the subscription. Other
consumers
+ * will get an error message. In Shared subscription, multiple consumers will
be
+ * able to use the same subscription name and the messages will be dispatched
in a
+ * round robin fashion. In Failover subscription, a primary-failover
subscription model
+ * allows for multiple consumers to attach to a single subscription, though
only one
+ * of them will be “master” at a given time. Only the primary consumer will
receive
+ * messages. When the primary consumer gets disconnected, one among the
failover
+ * consumers will be promoted to primary and will start getting messages.
+ */
+void
pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t
*consumer_configuration,
+ pulsar_consumer_type
consumerType);
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+ pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * and acknowledge messages delivered. A listener will be called in order
+ * for every message received.
+ */
+void pulsar_consumer_configuration_set_message_listener(
+ pulsar_consumer_configuration_t *consumer_configuration,
pulsar_message_listener messageListener);
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t
*consumer_configuration,
+ pulsar_consumer_t *consumer);
+
+/**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by
the Consumer before the
+ * application calls receive(). Using a higher value could potentially
increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the
consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared
subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout
nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should
not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ * the new receiver queue size value
+ */
+void pulsar_consumer_configuration_set_receiver_queue_size(
+ pulsar_consumer_configuration_t *consumer_configuration, int size);
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+ pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual
partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value
(default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+ pulsar_consumer_configuration_t *consumer_configuration, int
maxTotalReceiverQueueSizeAcrossPartitions);
+
+/**
+ * @return the configured max total receiver queue size across partitions
+ */
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+ pulsar_consumer_configuration_t *consumer_configuration);
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t
*consumer_configuration,
+ const char *consumerName);
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t
*consumer_configuration);
+
+/**
+ * Set the timeout in milliseconds for unacknowledged messages, the timeout
needs to be greater than
+ * 10 seconds. An Exception is thrown if the given value is less than 10000
(10 seconds).
+ * If a successful acknowledgement is not sent within the timeout all the
unacknowledged messages are
+ * redelivered.
+ * @param timeout in milliseconds
+ */
+void
pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t
*consumer_configuration,
+ const uint64_t
milliSeconds);
+
+/**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+long
pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t
*consumer_configuration);
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t
*consumer_configuration);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoKeyReader(CryptoKeyReaderPtr
+// cryptoKeyReader);
+//
+// ConsumerCryptoFailureAction getCryptoFailureAction()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoFailureAction(ConsumerCryptoFailureAction
+// action);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h
b/pulsar-client-cpp/include/pulsar/c/message.h
new file mode 100644
index 0000000..9645b02
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message pulsar_message_t;
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+pulsar_message_t *pulsar_message_create();
+void pulsar_message_free(pulsar_message_t *message);
+
+/// Builder
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data,
size_t size);
+
+/**
+ * Set content of the message to a buffer already allocated by the caller. No
copies of
+ * this buffer will be made. The caller is responsible to ensure the memory
buffer is
+ * valid until the message has been persisted (or an error is returned).
+ */
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void
*data, size_t size);
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name,
const char *value);
+
+/*
+ * set partition key for the message routing
+ * @param hash of this key is used to determine message's topic partition
+ */
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char
*partitionKey);
+
+/**
+ * Set the event timestamp for the message.
+ */
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t
eventTimestamp);
+
+/**
+ * Specify a custom sequence id for the message being published.
+ * <p>
+ * The sequence id can be used for deduplication purposes and it needs to
follow these rules:
+ * <ol>
+ * <li><code>sequenceId >= 0</code>
+ * <li>Sequence id for a message needs to be greater than sequence id for
earlier messages:
+ * <code>sequenceId(N+1) > sequenceId(N)</code>
+ * <li>It's not necessary for sequence ids to be consecutive. There can be
holes between messages. Eg. the
+ * <code>sequenceId</code> could represent an offset or a cumulative size.
+ * </ol>
+ *
+ * @param sequenceId
+ * the sequence id to assign to the current message
+ */
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t
sequenceId);
+
+/**
+ * override namespace replication clusters. note that it is the
+ * caller's responsibility to provide valid cluster names, and that
+ * all clusters have been previously configured as topics.
+ *
+ * given an empty list, the message will replicate per the namespace
+ * configuration.
+ *
+ * @param clusters where to send this message.
+ */
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const
char **clusters);
+
+/**
+ * Do not replicate this message
+ * @param flag if true, disable replication, otherwise use default
+ * replication
+ */
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
+
+/// Accessor for built messages
+
+/**
+ * Return the properties attached to the message.
+ * Properties are application defined key/value pairs that will be attached to
the message
+ *
+ * @return an unmodifiable view of the properties map
+ */
+// const StringMap& getProperties() const;
+
+/**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+int pulsar_message_has_property(pulsar_message_t *message, const char *name);
+
+/**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+const char *pulsar_message_get_property(pulsar_message_t *message, const char
*name);
+
+/**
+ * Get the content of the message
+ *
+ *
+ * @return the pointer to the message payload
+ */
+const void *pulsar_message_get_data(pulsar_message_t *message);
+
+/**
+ * Get the length of the message
+ *
+ * @return the length of the message payload
+ */
+uint32_t pulsar_message_get_length(pulsar_message_t *message);
+
+/**
+ * Get the unique message ID associated with this message.
+ *
+ * The message id can be used to univocally refer to a message without having
to keep the entire payload
+ * in memory.
+ *
+ * Only messages received from the consumer will have a message id assigned.
+ *
+ */
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message);
+
+/**
+ * Get the partition key for this message
+ * @return key string that is hashed to determine message's topic partition
+ */
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message);
+int pulsar_message_has_partition_key(pulsar_message_t *message);
+
+/**
+ * Get the UTC based timestamp in milliseconds referring to when the message
was published by the client
+ * producer
+ */
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message);
+
+/**
+ * Get the event timestamp associated with this message. It is set by the
client producer.
+ */
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h
b/pulsar-client-cpp/include/pulsar/c/message_id.h
new file mode 100644
index 0000000..a0eb684
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+/**
+ * MessageId representing the "earliest" or "oldest available" message stored
in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_earliest();
+
+/**
+ * MessageId representing the "latest" or "last published" message in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_latest();
+
+/**
+ * Serialize the message id into a binary string for storing
+ */
+const void *pulsar_message_id_serialize(int *len);
+
+/**
+ * Deserialize a message id from a binary string
+ */
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer,
uint32_t len);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h
b/pulsar-client-cpp/include/pulsar/c/message_router.h
new file mode 100644
index 0000000..aea4188
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
+
+typedef int (*pulsar_message_router)(pulsar_message_t *msg,
pulsar_topic_metadata_t *topicMetadata);
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t
*topicMetadata);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h
b/pulsar-client-cpp/include/pulsar/c/producer.h
new file mode 100644
index 0000000..6121e06
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
+typedef void (*pulsar_close_callback)(pulsar_result);
+
+/**
+ * @return the topic to which producer is publishing to
+ */
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer);
+
+/**
+ * @return the producer name which could have been assigned by the system or
specified by the client
+ */
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer);
+
+/**
+ * Publish a message on the topic associated with this Producer.
+ *
+ * This method will block until the message will be accepted and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the
sendTimeout,
+ * an error will be returned.
+ *
+ * This method is equivalent to asyncSend() and wait until the callback is
triggered.
+ *
+ * @param msg message to publish
+ * @return ResultOk if the message was published successfully
+ * @return ResultWriteError if it wasn't possible to publish the message
+ */
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer,
pulsar_message_t *msg);
+
+/**
+ * Asynchronously publish a message on the topic associated with this Producer.
+ *
+ * This method will initiate the publish operation and return immediately. The
+ * provided callback will be triggered when the message has been be accepted
and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the
sendTimeout, the
+ * callback will be triggered with a Result::WriteError code.
+ *
+ * @param msg message to publish
+ * @param callback the callback to get notification of the completion
+ */
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t
*msg,
+ pulsar_send_callback callback);
+
+/**
+ * Get the last sequence id that was published by this producer.
+ *
+ * This represent either the automatically assigned or custom sequence id (set
on the MessageBuilder) that
+ * was published and acknowledged by the broker.
+ *
+ * After recreating a producer with the same producer name, this will return
the last message that was
+ * published in
+ * the previous producer session, or -1 if there no message was ever published.
+ *
+ * @return the last sequence id published by this producer
+ */
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until
+ * all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ *
+ * @return an error code to indicate the success or failure
+ */
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. The provided callback
will be
+ * triggered when all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ */
+void pulsar_producer_close_async(pulsar_producer_t *producer,
pulsar_close_callback callback);
+
+void pulsar_producer_free(pulsar_producer_t *producer);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
new file mode 100644
index 0000000..534c411
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include <pulsar/c/message_router.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+ pulsar_UseSinglePartition,
+ pulsar_RoundRobinDistribution,
+ pulsar_CustomPartition
+} pulsar_partitions_routing_mode;
+
+typedef enum { pulsar_Murmur3_32Hash, pulsar_BoostHash, pulsar_JavaStringHash
} pulsar_hashing_scheme;
+
+typedef enum {
+ pulsar_CompressionNone = 0,
+ pulsar_CompressionLZ4 = 1,
+ pulsar_CompressionZLib = 2
+} pulsar_compression_type;
+
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create();
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf);
+
+void
pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t
*conf,
+ const char *producerName);
+
+const char
*pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t
*conf);
+
+void
pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t
*conf, int sendTimeoutMs);
+
+int
pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t
*conf);
+
+void
pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t
*conf,
+ int64_t
initialSequenceId);
+
+int64_t
pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t
*conf);
+
+void
pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t
*conf,
+
pulsar_compression_type compressionType);
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+ pulsar_producer_configuration_t *conf);
+
+void
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
*conf,
+ int
maxPendingMessages);
+
+int
pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t
*conf);
+
+/**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each
partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured
value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+ pulsar_producer_configuration_t *conf, int
maxPendingMessagesAcrossPartitions);
+
+/**
+ *
+ * @return the maximum number of pending messages allowed across all the
partitions
+ */
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+ pulsar_producer_configuration_t *conf);
+
+void
pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t
*conf,
+
pulsar_partitions_routing_mode mode);
+
+pulsar_partitions_routing_mode
pulsar_producer_configuration_get_partitions_routing_mode(
+ pulsar_producer_configuration_t *conf);
+
+void
pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t
*conf,
+ pulsar_message_router
router);
+
+void
pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t
*conf,
+ pulsar_hashing_scheme
scheme);
+
+pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t
*conf);
+
+void
pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t
*conf,
+ int
blockIfQueueFull);
+
+int
pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t
*conf);
+
+// Zero queue size feature will not be supported on consumer end if batching
is enabled
+void
pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t
*conf,
+ int batchingEnabled);
+
+int
pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t
*conf);
+
+void
pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t
*conf,
+ unsigned int
batchingMaxMessages);
+
+unsigned int
pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t
*conf);
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+ pulsar_producer_configuration_t *conf, unsigned long
batchingMaxAllowedSizeInBytes);
+
+unsigned long
pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+ pulsar_producer_configuration_t *conf);
+
+void
pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_producer_configuration_t
*conf,
+ unsigned
long batchingMaxPublishDelayMs);
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+ pulsar_producer_configuration_t *conf);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader() const;
+// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr
cryptoKeyReader);
+//
+// ProducerCryptoFailureAction getCryptoFailureAction() const;
+// ProducerConfiguration &setCryptoFailureAction(ProducerCryptoFailureAction
action);
+//
+// std::set <std::string> &getEncryptionKeys();
+// int isEncryptionEnabled() const;
+// ProducerConfiguration &addEncryptionKey(std::string key);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h
b/pulsar-client-cpp/include/pulsar/c/reader.h
new file mode 100644
index 0000000..648c172
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader pulsar_reader_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this reader is reading from
+ */
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader);
+
+/**
+ * Read a single message.
+ *
+ * If a message is not immediately available, this method will block until a
new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in
the configuration
+ */
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader,
pulsar_message_t **msg);
+
+/**
+ * Read a single message
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in
the configuration
+ */
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
pulsar_message_t **msg,
+ int timeoutMs);
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback
callback);
+
+void pulsar_reader_free(pulsar_reader_t *reader);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
new file mode 100644
index 0000000..914bbd9
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
+
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader,
pulsar_message_t *msg);
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create();
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t
*configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * messages. A listener will be called in order for every message received.
+ */
+void
pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t
*configuration,
+ pulsar_reader_listener
listener);
+
+int
pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t
*configuration);
+
+/**
+ * Sets the size of the reader receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by
the Consumer before the
+ * application calls receive(). Using a higher value could potentially
increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the
consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared
subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout
nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should
not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ * the new receiver queue size value
+ */
+void
pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t
*configuration,
+ int size);
+
+int
pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t
*configuration);
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t
*configuration,
+ const char *readerName);
+
+const char
*pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t
*configuration);
+
+void
pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t
*configuration,
+ const char
*subscriptionRolePrefix);
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+ pulsar_reader_configuration_t *configuration);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/result.h
b/pulsar-client-cpp/include/pulsar/c/result.h
new file mode 100644
index 0000000..0ca769d
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/result.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+ pulsar_result_Ok, /// Operation successful
+
+ pulsar_result_UnknownError, /// Unknown error happened on broker
+
+ pulsar_result_InvalidConfiguration, /// Invalid configuration
+
+ pulsar_result_Timeout, /// Operation timed out
+ pulsar_result_LookupError, /// Broker lookup failed
+ pulsar_result_ConnectError, /// Failed to connect to broker
+ pulsar_result_ReadError, /// Failed to read from socket
+
+ pulsar_result_AuthenticationError, /// Authentication failed
on broker
+ pulsar_result_AuthorizationError, /// Client is not
authorized to create producer/consumer
+ pulsar_result_ErrorGettingAuthenticationData, /// Client cannot find
authorization data
+
+ pulsar_result_BrokerMetadataError, /// Broker failed in updating
metadata
+ pulsar_result_BrokerPersistenceError, /// Broker failed to persist entry
+ pulsar_result_ChecksumError, /// Corrupt message checksum failure
+
+ pulsar_result_ConsumerBusy, /// Exclusive consumer is already connected
+ pulsar_result_NotConnected, /// Producer/Consumer is not currently
connected to broker
+ pulsar_result_AlreadyClosed, /// Producer/Consumer is already closed and
not accepting any operation
+
+ pulsar_result_InvalidMessage, /// Error in publishing an already used
message
+
+ pulsar_result_ConsumerNotInitialized, /// Consumer is not
initialized
+ pulsar_result_ProducerNotInitialized, /// Producer is not
initialized
+ pulsar_result_TooManyLookupRequestException, /// Too Many concurrent
LookupRequest
+
+ pulsar_result_InvalidTopicName, /// Invalid topic name
+ pulsar_result_InvalidUrl, /// Client Initialized with Invalid
Broker Url (VIP Url passed to Client
+ /// Constructor)
+ pulsar_result_ServiceUnitNotReady, /// Service Unit unloaded between
client did lookup and
+ /// producer/consumer got created
+
+ pulsar_result_OperationNotSupported,
+ pulsar_result_ProducerBlockedQuotaExceededError, /// Producer is
blocked
+ pulsar_result_ProducerBlockedQuotaExceededException, /// Producer is
getting exception
+ pulsar_result_ProducerQueueIsFull, /// Producer queue
is full
+ pulsar_result_MessageTooBig, /// Trying to send a
messages exceeding the max size
+ pulsar_result_TopicNotFound, /// Topic not found
+ pulsar_result_SubscriptionNotFound, /// Subscription not
found
+ pulsar_result_ConsumerNotFound, /// Consumer not
found
+ pulsar_result_UnsupportedVersionError, /// Error when an older
client/version doesn't support a required
+ /// feature
+ pulsar_result_TopicTerminated, /// Topic was already terminated
+ pulsar_result_CryptoError /// Error when crypto operation
fails
+} pulsar_result;
+
+// Return string representation of result code
+const char *pulsar_result_str(pulsar_result result);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/lib/CMakeLists.txt
b/pulsar-client-cpp/lib/CMakeLists.txt
index 052de8b..74dbc08 100644
--- a/pulsar-client-cpp/lib/CMakeLists.txt
+++ b/pulsar-client-cpp/lib/CMakeLists.txt
@@ -17,7 +17,7 @@
# under the License.
#
-file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc)
+file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc c/*.cc)
execute_process(COMMAND cat ../pom.xml COMMAND xmllint --format - COMMAND sed
"s/xmlns=\".*\"//g" COMMAND xmllint --stream --pattern /project/version --debug
- COMMAND grep -A 2 "matches pattern" COMMAND grep text COMMAND sed "s/.* [0-9]
//g" OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE PV)
set (CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -D_PULSAR_VERSION_=\\\"${PV}\\\"")
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc
b/pulsar-client-cpp/lib/c/c_Authentication.cc
new file mode 100644
index 0000000..5cd2a64
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/authentication.h>
+
+#include <pulsar/Authentication.h>
+
+#include "c_structs.h"
+
+pulsar_authentication_t *pulsar_authentication_create(const char
*dynamicLibPath,
+ const char
*authParamsString) {
+ pulsar_authentication_t *authentication = new pulsar_authentication_t;
+ authentication->auth = pulsar::AuthFactory::create(dynamicLibPath,
authParamsString);
+ return authentication;
+}
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication) {
delete authentication; }
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc
b/pulsar-client-cpp/lib/c/c_Client.cc
new file mode 100644
index 0000000..cec7a13
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <boost/bind.hpp>
+
+#include "c_structs.h"
+
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+ const pulsar_client_configuration_t
*clientConfiguration) {
+ pulsar_client_t *c_client = new pulsar_client_t;
+ c_client->client.reset(new pulsar::Client(std::string(serviceUrl)));
+ return c_client;
+}
+
+void pulsar_client_free(pulsar_client_t *client) { delete client; }
+
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const
char *topic,
+ const
pulsar_producer_configuration_t *conf,
+ pulsar_producer_t **c_producer) {
+ pulsar::Producer producer;
+ pulsar::Result res = client->client->createProducer(topic, conf->conf,
producer);
+ if (res == pulsar::ResultOk) {
+ (*c_producer) = new pulsar_producer_t;
+ (*c_producer)->producer = producer;
+ return pulsar_result_Ok;
+ } else {
+ return (pulsar_result)res;
+ }
+}
+
+static void handle_create_producer_callback(pulsar::Result result,
pulsar::Producer producer,
+ pulsar_create_producer_callback
callback) {
+ if (result == pulsar::ResultOk) {
+ pulsar_producer_t *c_producer = new pulsar_producer_t;
+ c_producer->producer = producer;
+ callback(pulsar_result_Ok, c_producer);
+ } else {
+ callback((pulsar_result)result, NULL);
+ }
+}
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char
*topic,
+ const pulsar_producer_configuration_t
*conf,
+ pulsar_create_producer_callback
callback) {
+ client->client->createProducerAsync(topic, conf->conf,
+
boost::bind(&handle_create_producer_callback, _1, _2, callback));
+}
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char
*topic,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t
*conf,
+ pulsar_consumer_t **c_consumer) {
+ pulsar::Consumer consumer;
+ pulsar::Result res =
+ client->client->subscribe(topic, subscriptionName,
conf->consumerConfiguration, consumer);
+ if (res == pulsar::ResultOk) {
+ (*c_consumer) = new pulsar_consumer_t;
+ (*c_consumer)->consumer = consumer;
+ return pulsar_result_Ok;
+ } else {
+ return (pulsar_result)res;
+ }
+}
+
+static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer
consumer,
+ pulsar_subscribe_callback callback) {
+ if (result == pulsar::ResultOk) {
+ pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
+ c_consumer->consumer = consumer;
+ callback(pulsar_result_Ok, c_consumer);
+ } else {
+ callback((pulsar_result)result, NULL);
+ }
+}
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic,
const char *subscriptionName,
+ const pulsar_consumer_configuration_t
*conf, pulsar_consumer_t **consumer,
+ pulsar_subscribe_callback callback) {
+ client->client->subscribeAsync(topic, subscriptionName,
conf->consumerConfiguration,
+ boost::bind(&handle_subscribe_callback, _1,
_2, callback));
+}
+
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char
*topic,
+ const pulsar_message_id_t
*startMessageId,
+ pulsar_reader_configuration_t *conf,
pulsar_reader_t **c_reader) {
+ pulsar::Reader reader;
+ pulsar::Result res = client->client->createReader(topic,
startMessageId->messageId, conf->conf, reader);
+ if (res == pulsar::ResultOk) {
+ (*c_reader) = new pulsar_reader_t;
+ (*c_reader)->reader = reader;
+ return pulsar_result_Ok;
+ } else {
+ return (pulsar_result)res;
+ }
+}
+
+static void handle_reader_callback(pulsar::Result result, pulsar::Reader
reader,
+ pulsar_reader_callback callback) {
+ if (result == pulsar::ResultOk) {
+ pulsar_reader_t *c_reader = new pulsar_reader_t;
+ c_reader->reader = reader;
+ callback(pulsar_result_Ok, c_reader);
+ } else {
+ callback((pulsar_result)result, NULL);
+ }
+}
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char
*topic,
+ const pulsar_message_id_t
*startMessageId,
+ pulsar_reader_configuration_t *conf,
pulsar_reader_t **reader,
+ pulsar_reader_callback callback) {
+ client->client->createReaderAsync(topic, startMessageId->messageId,
conf->conf,
+ boost::bind(&handle_reader_callback, _1,
_2, callback));
+}
+
+pulsar_result pulsar_client_close(pulsar_client_t *client) { return
(pulsar_result)client->client->close(); }
+
+static void handle_client_close(pulsar::Result result, pulsar_close_callback
callback) {
+ callback((pulsar_result)result);
+}
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback
callback) {
+ client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
new file mode 100644
index 0000000..8caf8e8
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_client_configuration_t *pulsar_client_configuration_create() {
+ pulsar_client_configuration_t *c_conf = new pulsar_client_configuration_t;
+ c_conf->conf = pulsar::ClientConfiguration();
+ return c_conf;
+}
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf) {
delete conf; }
+
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+ pulsar_authentication_t
*authentication) {
+ conf->conf.setAuth(authentication->auth);
+}
+
+void
pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t
*conf,
+ int timeout) {
+ conf->conf.setOperationTimeoutSeconds(timeout);
+}
+
+int
pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getOperationTimeoutSeconds();
+}
+
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t
*conf, int threads) {
+ conf->conf.setIOThreads(threads);
+}
+
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getIOThreads();
+}
+
+void
pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t
*conf,
+ int threads) {
+ conf->conf.setMessageListenerThreads(threads);
+}
+
+int
pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getMessageListenerThreads();
+}
+
+void
pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t
*conf,
+ int
concurrentLookupRequest) {
+ conf->conf.setConcurrentLookupRequest(concurrentLookupRequest);
+}
+
+int
pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getConcurrentLookupRequest();
+}
+
+void
pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t
*conf,
+ const char
*logConfFilePath) {
+ conf->conf.setLogConfFilePath(logConfFilePath);
+}
+
+const char
*pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getLogConfFilePath().c_str();
+}
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t
*conf, int useTls) {
+ conf->conf.setUseTls(useTls);
+}
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t
*conf) {
+ return conf->conf.isUseTls();
+}
+
+void
pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t
*conf,
+ const char
*tlsTrustCertsFilePath) {
+ conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+}
+
+const char
*pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t
*conf) {
+ return conf->conf.getTlsTrustCertsFilePath().c_str();
+}
+
+void
pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t
*conf,
+ int
allowInsecure) {
+ conf->conf.setTlsAllowInsecureConnection(allowInsecure);
+}
+
+int
pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t
*conf) {
+ return conf->conf.isTlsAllowInsecureConnection();
+}
+
+void
pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t
*conf,
+ const unsigned
int interval) {
+ conf->conf.setStatsIntervalInSeconds(interval);
+}
+
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+ pulsar_client_configuration_t *conf) {
+ return conf->conf.getStatsIntervalInSeconds();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc
b/pulsar-client-cpp/lib/c/c_Consumer.cc
new file mode 100644
index 0000000..22021a5
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer) {
+ return consumer->consumer.getTopic().c_str();
+}
+
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer)
{
+ return consumer->consumer.getSubscriptionName().c_str();
+}
+
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
+ return (pulsar_result)consumer->consumer.unsubscribe();
+}
+
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer,
pulsar_result_callback callback) {
+ consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback,
_1, callback));
+}
+
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer,
pulsar_message_t **msg) {
+ pulsar::Message message;
+ pulsar::Result res = consumer->consumer.receive(message);
+ if (res == pulsar::ResultOk) {
+ (*msg) = new pulsar_message_t;
+ (*msg)->message = message;
+ }
+ return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t
*consumer, pulsar_message_t **msg,
+ int timeoutMs) {
+ pulsar::Message message;
+ pulsar::Result res = consumer->consumer.receive(message, timeoutMs);
+ if (res == pulsar::ResultOk) {
+ (*msg) = new pulsar_message_t;
+ (*msg)->message = message;
+ }
+ return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer,
pulsar_message_t *message) {
+ return (pulsar_result)consumer->consumer.acknowledge(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId) {
+ return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer,
pulsar_message_t *message,
+ pulsar_result_callback callback) {
+ consumer->consumer.acknowledgeAsync(message->message,
boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,
+ pulsar_result_callback callback) {
+ consumer->consumer.acknowledgeAsync(messageId->messageId,
+ boost::bind(handle_result_callback,
_1, callback));
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t
*consumer, pulsar_message_t *message) {
+ return
(pulsar_result)consumer->consumer.acknowledgeCumulative(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t
*consumer,
+ pulsar_message_id_t
*messageId) {
+ return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer,
pulsar_message_t *message,
+ pulsar_result_callback
callback) {
+ consumer->consumer.acknowledgeCumulativeAsync(message->message,
+
boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t
*consumer,
+ pulsar_message_id_t
*messageId,
+ pulsar_result_callback
callback) {
+ consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
+
boost::bind(handle_result_callback, _1, callback));
+}
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
+ return (pulsar_result)consumer->consumer.close();
+}
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer,
pulsar_result_callback callback) {
+ consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1,
callback));
+}
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
+
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t
*consumer) {
+ return (pulsar_result)consumer->consumer.pauseMessageListener();
+}
+
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
+ return (pulsar_result)consumer->consumer.resumeMessageListener();
+}
+
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+ return consumer->consumer.redeliverUnacknowledgedMessages();
+}
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
new file mode 100644
index 0000000..dcd0aac
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/consumer_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create() {
+ return new pulsar_consumer_configuration_t;
+}
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t
*consumer_configuration) {
+ delete consumer_configuration;
+}
+
+void
pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t
*consumer_configuration,
+ pulsar_consumer_type
consumerType) {
+
consumer_configuration->consumerConfiguration.setConsumerType((pulsar::ConsumerType)consumerType);
+}
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+ pulsar_consumer_configuration_t *consumer_configuration) {
+ return
(pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
+}
+
+static void message_listener_callback(pulsar::Consumer consumer, const
pulsar::Message &msg,
+ pulsar_message_listener listener) {
+ pulsar_consumer_t c_consumer;
+ c_consumer.consumer = consumer;
+ pulsar_message_t *message = new pulsar_message_t;
+ message->message = msg;
+ listener(&c_consumer, message);
+}
+
+void pulsar_consumer_configuration_set_message_listener(
+ pulsar_consumer_configuration_t *consumer_configuration,
pulsar_message_listener messageListener) {
+ consumer_configuration->consumerConfiguration.setMessageListener(
+ boost::bind(message_listener_callback, _1, _2, messageListener));
+}
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t
*consumer_configuration) {
+ return consumer_configuration->consumerConfiguration.hasMessageListener();
+}
+
+void pulsar_consumer_configuration_set_receiver_queue_size(
+ pulsar_consumer_configuration_t *consumer_configuration, int size) {
+ consumer_configuration->consumerConfiguration.setReceiverQueueSize(size);
+}
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+ pulsar_consumer_configuration_t *consumer_configuration) {
+ return
consumer_configuration->consumerConfiguration.getReceiverQueueSize();
+}
+
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+ pulsar_consumer_configuration_t *consumer_configuration, int
maxTotalReceiverQueueSizeAcrossPartitions) {
+
consumer_configuration->consumerConfiguration.setMaxTotalReceiverQueueSizeAcrossPartitions(
+ maxTotalReceiverQueueSizeAcrossPartitions);
+}
+
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+ pulsar_consumer_configuration_t *consumer_configuration) {
+ return
consumer_configuration->consumerConfiguration.getMaxTotalReceiverQueueSizeAcrossPartitions();
+}
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t
*consumer_configuration,
+ const char *consumerName) {
+
consumer_configuration->consumerConfiguration.setConsumerName(consumerName);
+}
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t
*consumer_configuration) {
+ return
consumer_configuration->consumerConfiguration.getConsumerName().c_str();
+}
+
+void
pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t
*consumer_configuration,
+ const uint64_t
milliSeconds) {
+
consumer_configuration->consumerConfiguration.setUnAckedMessagesTimeoutMs(milliSeconds);
+}
+
+long pulsar_consumer_get_unacked_messages_timeout_ms(
+ pulsar_consumer_configuration_t *consumer_configuration) {
+ return
consumer_configuration->consumerConfiguration.getUnAckedMessagesTimeoutMs();
+}
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t
*consumer_configuration) {
+ return consumer_configuration->consumerConfiguration.isEncryptionEnabled();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc
b/pulsar-client-cpp/lib/c/c_Message.cc
new file mode 100644
index 0000000..d87560e
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message.h>
+#include "c_structs.h"
+
+pulsar_message_t *pulsar_message_create() { return new pulsar_message_t; }
+
+void pulsar_message_free(pulsar_message_t *message) { delete message; }
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data,
size_t size) {
+ message->builder.setContent(data, size);
+}
+
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void
*data, size_t size) {
+ message->builder.setAllocatedContent(data, size);
+}
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name,
const char *value) {
+ message->builder.setProperty(name, value);
+}
+
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char
*partitionKey) {
+ message->builder.setPartitionKey(partitionKey);
+}
+
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t
eventTimestamp) {
+ message->builder.setEventTimestamp(eventTimestamp);
+}
+
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t
sequenceId) {
+ message->builder.setSequenceId(sequenceId);
+}
+
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const
char **clusters) {
+ const char *c = clusters[0];
+ std::vector<std::string> clustersList;
+ while (c) {
+ clustersList.push_back(c);
+ ++c;
+ }
+
+ message->builder.setReplicationClusters(clustersList);
+}
+
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
+ message->builder.disableReplication(flag);
+}
+
+int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
+ return message->message.hasProperty(name);
+}
+
+const char *pulsar_message_get_property(pulsar_message_t *message, const char
*name) {
+ return message->message.getProperty(name).c_str();
+}
+
+const void *pulsar_message_get_data(pulsar_message_t *message) { return
message->message.getData(); }
+
+uint32_t pulsar_message_get_length(pulsar_message_t *message) { return
message->message.getLength(); }
+
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message) {
+ pulsar_message_id_t *messageId = new pulsar_message_id_t;
+ messageId->messageId = message->message.getMessageId();
+ return messageId;
+}
+
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message) {
+ return message->message.getPartitionKey().c_str();
+}
+
+int pulsar_message_has_partition_key(pulsar_message_t *message) { return
message->message.hasPartitionKey(); }
+
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
+ return message->message.getPublishTimestamp();
+}
+
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message) {
+ return message->message.getEventTimestamp();
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc
b/pulsar-client-cpp/lib/c/c_MessageId.cc
new file mode 100644
index 0000000..5728359
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_id.h>
+#include "c_structs.h"
+
+#include <boost/thread/once.hpp>
+
+boost::once_flag initialized = BOOST_ONCE_INIT;
+
+static pulsar_message_id_t earliest;
+static pulsar_message_id_t latest;
+
+static void initialize() {
+ earliest.messageId = pulsar::MessageId::earliest();
+ latest.messageId = pulsar::MessageId::latest();
+}
+
+const pulsar_message_id_t *pulsar_message_id_earliest() {
+ boost::call_once(&initialize, initialized);
+ return &earliest;
+}
+
+const pulsar_message_id_t *pulsar_message_id_latest() {
+ boost::call_once(&initialize, initialized);
+ return &latest;
+}
+
+const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int
*len) {
+ std::string str;
+ messageId->messageId.serialize(str);
+ void *p = malloc(str.length());
+ memcpy(p, str.c_str(), str.length());
+ return p;
+}
+
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer,
uint32_t len) {
+ std::string strId((const char *)buffer, len);
+ pulsar_message_id_t *messageId = new pulsar_message_id_t;
+ messageId->messageId = pulsar::MessageId::deserialize(strId);
+ return messageId;
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageRouter.cc
b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
new file mode 100644
index 0000000..3184357
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_router.h>
+
+#include "c_structs.h"
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t
*topicMetadata) {
+ return topicMetadata->metadata->getNumPartitions();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc
b/pulsar-client-cpp/lib/c/c_Producer.cc
new file mode 100644
index 0000000..1de670c
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/bind.hpp>
+
+#include <pulsar/c/producer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer) {
+ return producer->producer.getTopic().c_str();
+}
+
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer) {
+ return producer->producer.getProducerName().c_str();
+}
+
+void pulsar_producer_free(pulsar_producer_t *producer) { delete producer; }
+
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer,
pulsar_message_t *msg) {
+ msg->message = msg->builder.build();
+ return (pulsar_result)producer->producer.send(msg->message);
+}
+
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
+ pulsar_send_callback callback) {
+ callback((pulsar_result)result, msg);
+}
+
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t
*msg,
+ pulsar_send_callback callback) {
+ msg->message = msg->builder.build();
+ producer->producer.sendAsync(msg->message,
boost::bind(&handle_producer_send, _1, msg, callback));
+}
+
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
+ return producer->producer.getLastSequenceId();
+}
+
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
+ return (pulsar_result)producer->producer.close();
+}
+
+void pulsar_producer_close_async(pulsar_producer_t *producer,
pulsar_close_callback callback) {
+ producer->producer.closeAsync(boost::bind(handle_result_callback, _1,
callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
new file mode 100644
index 0000000..8cc9345
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/c/producer_configuration.h>
+#include <include/pulsar/c/message.h>
+
+#include "c_structs.h"
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create() {
+ pulsar_producer_configuration_t *c_conf = new
pulsar_producer_configuration_t;
+ c_conf->conf = pulsar::ProducerConfiguration();
+ return c_conf;
+}
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf)
{ delete conf; }
+
+void
pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t
*conf,
+ const char *producerName)
{
+ conf->conf.setProducerName(producerName);
+}
+
+const char
*pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getProducerName().c_str();
+}
+
+void
pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t
*conf,
+ int sendTimeoutMs) {
+ conf->conf.setSendTimeout(sendTimeoutMs);
+}
+
+int
pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getSendTimeout();
+}
+
+void
pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t
*conf,
+ int64_t
initialSequenceId) {
+ conf->conf.setInitialSequenceId(initialSequenceId);
+}
+
+int64_t
pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getInitialSequenceId();
+}
+
+void
pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t
*conf,
+
pulsar_compression_type compressionType) {
+ conf->conf.setCompressionType((pulsar::CompressionType)compressionType);
+}
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+ pulsar_producer_configuration_t *conf) {
+ return (pulsar_compression_type)conf->conf.getCompressionType();
+}
+
+void
pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t
*conf,
+ int
maxPendingMessages) {
+ conf->conf.setMaxPendingMessages(maxPendingMessages);
+}
+
+int
pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getMaxPendingMessages();
+}
+
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+ pulsar_producer_configuration_t *conf, int
maxPendingMessagesAcrossPartitions) {
+
conf->conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+}
+
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+ pulsar_producer_configuration_t *conf) {
+ return conf->conf.getMaxPendingMessagesAcrossPartitions();
+}
+
+void
pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t
*conf,
+
pulsar_partitions_routing_mode mode) {
+
conf->conf.setPartitionsRoutingMode((pulsar::ProducerConfiguration::PartitionsRoutingMode)mode);
+}
+
+pulsar_partitions_routing_mode
pulsar_producer_configuration_get_partitions_routing_mode(
+ pulsar_producer_configuration_t *conf) {
+ return
(pulsar_partitions_routing_mode)conf->conf.getPartitionsRoutingMode();
+}
+
+void
pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t
*conf,
+ pulsar_hashing_scheme
scheme) {
+
conf->conf.setHashingScheme((pulsar::ProducerConfiguration::HashingScheme)scheme);
+}
+
+pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
+ pulsar_producer_configuration_t *conf) {
+ return (pulsar_hashing_scheme)conf->conf.getHashingScheme();
+}
+
+class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
+ pulsar_message_router _router;
+
+ public:
+ MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+
+ int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata
&topicMetadata) {
+ pulsar_message_t message;
+ message.message = msg;
+
+ pulsar_topic_metadata_t metadata;
+ metadata.metadata = &topicMetadata;
+
+ return _router(&message, &metadata);
+ }
+};
+
+void
pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t
*conf,
+ pulsar_message_router
router) {
+
conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+}
+
+void
pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t
*conf,
+ int
blockIfQueueFull) {
+ conf->conf.setBlockIfQueueFull(blockIfQueueFull);
+}
+
+int
pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getBlockIfQueueFull();
+}
+
+void
pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t
*conf,
+ int batchingEnabled) {
+ conf->conf.setBatchingEnabled(batchingEnabled);
+}
+
+int
pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getBatchingEnabled();
+}
+
+void
pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t
*conf,
+ unsigned int
batchingMaxMessages) {
+ conf->conf.setBatchingMaxMessages(batchingMaxMessages);
+}
+
+unsigned int
pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t
*conf) {
+ return conf->conf.getBatchingMaxMessages();
+}
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+ pulsar_producer_configuration_t *conf, unsigned long
batchingMaxAllowedSizeInBytes) {
+ conf->conf.setBatchingMaxAllowedSizeInBytes(batchingMaxAllowedSizeInBytes);
+}
+
+unsigned long
pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+ pulsar_producer_configuration_t *conf) {
+ return conf->conf.getBatchingMaxAllowedSizeInBytes();
+}
+
+void pulsar_producer_configuration_set_batching_max_publish_delay_ms(
+ pulsar_producer_configuration_t *conf, unsigned long
batchingMaxPublishDelayMs) {
+ conf->conf.setBatchingMaxPublishDelayMs(batchingMaxPublishDelayMs);
+}
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+ pulsar_producer_configuration_t *conf) {
+ return conf->conf.getBatchingMaxPublishDelayMs();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc
b/pulsar-client-cpp/lib/c/c_Reader.cc
new file mode 100644
index 0000000..bb5d69b
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader) { return
reader->reader.getTopic().c_str(); }
+
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader,
pulsar_message_t **msg) {
+ pulsar::Message message;
+ pulsar::Result res = reader->reader.readNext(message);
+ if (res == pulsar::ResultOk) {
+ (*msg) = new pulsar_message_t;
+ (*msg)->message = message;
+ }
+ return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
pulsar_message_t **msg,
+ int timeoutMs) {
+ pulsar::Message message;
+ pulsar::Result res = reader->reader.readNext(message, timeoutMs);
+ if (res == pulsar::ResultOk) {
+ (*msg) = new pulsar_message_t;
+ (*msg)->message = message;
+ }
+ return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return
(pulsar_result)reader->reader.close(); }
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback
callback) {
+ reader->reader.closeAsync(boost::bind(handle_result_callback, _1,
callback));
+}
+
+void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
new file mode 100644
index 0000000..b6419a6
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/ReaderConfiguration.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
+ return new pulsar_reader_configuration_t;
+}
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t
*configuration) { delete configuration; }
+
+static void message_listener_callback(pulsar::Reader reader, const
pulsar::Message &msg,
+ pulsar_reader_listener listener) {
+ pulsar_reader_t c_reader;
+ c_reader.reader = reader;
+ pulsar_message_t *message = new pulsar_message_t;
+ message->message = msg;
+ listener(&c_reader, message);
+}
+
+void
pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t
*configuration,
+ pulsar_reader_listener
listener) {
+
configuration->conf.setReaderListener(boost::bind(message_listener_callback,
_1, _2, listener));
+}
+
+int
pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t
*configuration) {
+ return configuration->conf.hasReaderListener();
+}
+
+void
pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t
*configuration,
+ int size) {
+ configuration->conf.setReceiverQueueSize(size);
+}
+
+int
pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t
*configuration) {
+ return configuration->conf.getReceiverQueueSize();
+}
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t
*configuration,
+ const char *readerName) {
+ configuration->conf.setReaderName(readerName);
+}
+
+const char
*pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t
*configuration) {
+ return configuration->conf.getReaderName().c_str();
+}
+
+void
pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t
*configuration,
+ const char
*subscriptionRolePrefix) {
+ configuration->conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+}
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+ pulsar_reader_configuration_t *configuration) {
+ return configuration->conf.getSubscriptionRolePrefix().c_str();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Result.cc
b/pulsar-client-cpp/lib/c/c_Result.cc
new file mode 100644
index 0000000..157a91f
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Result.cc
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/result.h>
+#include <pulsar/Result.h>
+
+const char *pulsar_result_str(pulsar_result result) { return
pulsar::strResult((pulsar::Result)result); }
diff --git a/pulsar-client-cpp/lib/c/c_structs.h
b/pulsar-client-cpp/lib/c/c_structs.h
new file mode 100644
index 0000000..b207ab4
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/Client.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+struct _pulsar_client {
+ boost::scoped_ptr<pulsar::Client> client;
+};
+
+struct _pulsar_client_configuration {
+ pulsar::ClientConfiguration conf;
+};
+
+struct _pulsar_producer {
+ pulsar::Producer producer;
+};
+
+struct _pulsar_producer_configuration {
+ pulsar::ProducerConfiguration conf;
+};
+
+struct _pulsar_consumer {
+ pulsar::Consumer consumer;
+};
+
+struct _pulsar_consumer_configuration {
+ pulsar::ConsumerConfiguration consumerConfiguration;
+};
+
+struct _pulsar_reader {
+ pulsar::Reader reader;
+};
+
+struct _pulsar_reader_configuration {
+ pulsar::ReaderConfiguration conf;
+};
+
+struct _pulsar_message {
+ pulsar::MessageBuilder builder;
+ pulsar::Message message;
+};
+
+struct _pulsar_message_id {
+ pulsar::MessageId messageId;
+};
+
+struct _pulsar_authentication {
+ pulsar::AuthenticationPtr auth;
+};
+
+struct _pulsar_topic_metadata {
+ const pulsar::TopicMetadata* metadata;
+};
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+static void handle_result_callback(pulsar::Result result,
pulsar_result_callback callback) {
+ callback((pulsar_result)result);
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
[email protected].