This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f8594e0 Refactored C++ tests (#3003) f8594e0 is described below commit f8594e09b2e96729c396628440cb2d566db44a21 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Nov 20 09:57:05 2018 -0800 Refactored C++ tests (#3003) * Refactored C++ tests * Refactored python tests * Fixed format * Mount maven repo from outside docker * Fixed running test inside docker * Wait to ensure change in dedup state is picked up * Fixed ClientDeduplicationTest * Fixed formatting --- bin/pulsar-perf | 2 +- pulsar-client-cpp/pulsar-test-service-start.sh | 98 +++++++++ ...client-ssl.conf => pulsar-test-service-stop.sh} | 15 +- pulsar-client-cpp/python/pulsar_test.py | 218 +++++++++++---------- pulsar-client-cpp/run-unit-tests.sh | 45 +---- pulsar-client-cpp/test-conf/client-ssl.conf | 11 +- pulsar-client-cpp/test-conf/standalone-ssl.conf | 20 +- pulsar-client-cpp/tests/AuthPluginTest.cc | 92 +++++---- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 176 +++++++++-------- pulsar-client-cpp/tests/BatchMessageTest.cc | 23 ++- pulsar-client-cpp/tests/BinaryLookupServiceTest.cc | 28 ++- pulsar-client-cpp/tests/ClientDeduplicationTest.cc | 34 ++-- pulsar-client-cpp/tests/ClientTest.cc | 2 +- .../tests/ConsumerConfigurationTest.cc | 16 +- pulsar-client-cpp/tests/ConsumerStatsTest.cc | 14 +- pulsar-client-cpp/tests/ReaderTest.cc | 19 +- pulsar-client-cpp/tests/ZeroQueueSizeTest.cc | 6 +- .../pulsar/testclient/PerformanceConsumer.java | 9 - .../pulsar/testclient/PerformanceProducer.java | 9 - 19 files changed, 461 insertions(+), 376 deletions(-) diff --git a/bin/pulsar-perf b/bin/pulsar-perf index 7a250b7..7c32843 100755 --- a/bin/pulsar-perf +++ b/bin/pulsar-perf @@ -21,7 +21,7 @@ BINDIR=$(dirname "$0") PULSAR_HOME=`cd $BINDIR/..;pwd` -DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf +DEFAULT_CLIENT_CONF=${PULSAR_CLIENT_CONF:-"$PULSAR_HOME/conf/client.conf"} DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ] diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh b/pulsar-client-cpp/pulsar-test-service-start.sh new file mode 100755 index 0000000..914354a --- /dev/null +++ b/pulsar-client-cpp/pulsar-test-service-start.sh @@ -0,0 +1,98 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +SRC_DIR=$(git rev-parse --show-toplevel) +cd $SRC_DIR + +if [ -f /.dockerenv ]; then + # When running tests inside docker. Unpack the pulsar tgz + # because otherwise the classpath might not be correct + # in picking up the jars from local maven repo + export PULSAR_DIR=/tmp/pulsar-test-dist + rm -rf $PULSAR_DIR + mkdir $PULSAR_DIR + TGZ=$(ls -1 $SRC_DIR/distribution/server/target/apache-pulsar*bin.tar.gz | head -1) + tar xfz $TGZ -C $PULSAR_DIR --strip-components 1 +else + export PULSAR_DIR=$SRC_DIR +fi + +DATA_DIR=/tmp/pulsar-test-data +rm -rf $DATA_DIR +mkdir -p $DATA_DIR + +# Copy TLS test certificates +mkdir -p $DATA_DIR/certs +cp $SRC_DIR/pulsar-broker/src/test/resources/authentication/tls/*.pem $DATA_DIR/certs + +export PULSAR_STANDALONE_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/standalone-ssl.conf +$PULSAR_DIR/bin/pulsar-daemon start standalone \ + --no-functions-worker --no-stream-storage \ + --zookeeper-dir $DATA_DIR/zookeeper \ + --bookkeeper-dir $DATA_DIR/bookkeeper + +echo "-- Wait for Pulsar service to be ready" +until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done + +echo "-- Pulsar service is ready -- Configure permissions" + +export PULSAR_CLIENT_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/client-ssl.conf + +# Create "standalone" cluster +$PULSAR_DIR/bin/pulsar-admin clusters create \ + standalone \ + --url http://localhost:8080/ \ + --url-secure https://localhost:8443/ \ + --broker-url pulsar://localhost:6650/ \ + --broker-url-secure pulsar+ssl://localhost:6651/ + +# Create "public" tenant +$PULSAR_DIR/bin/pulsar-admin tenants create public -r "anonymous" -c "standalone" + +# Create "public/default" with no auth required +$PULSAR_DIR/bin/pulsar-admin namespaces create public/default \ + --clusters standalone +$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default \ + --actions produce,consume \ + --role "anonymous" + +# Create "public/default-2" with no auth required +$PULSAR_DIR/bin/pulsar-admin namespaces create public/default-2 \ + --clusters standalone +$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-2 \ + --actions produce,consume \ + --role "anonymous" + +# Create "public/default-3" with no auth required +$PULSAR_DIR/bin/pulsar-admin namespaces create public/default-3 \ + --clusters standalone +$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default-3 \ + --actions produce,consume \ + --role "anonymous" + +# Create "private" tenant +$PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c "standalone" + +# Create "private/auth" with required authentication +$PULSAR_DIR/bin/pulsar-admin namespaces create private/auth --clusters standalone + +echo "-- Ready to start tests" diff --git a/pulsar-client-cpp/test-conf/client-ssl.conf b/pulsar-client-cpp/pulsar-test-service-stop.sh old mode 100644 new mode 100755 similarity index 61% copy from pulsar-client-cpp/test-conf/client-ssl.conf copy to pulsar-client-cpp/pulsar-test-service-stop.sh index d4071b4..ff67a61 --- a/pulsar-client-cpp/test-conf/client-ssl.conf +++ b/pulsar-client-cpp/pulsar-test-service-stop.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -17,11 +18,9 @@ # under the License. # -# Pulsar Client configuration -webServiceUrl=https://localhost:9766/ -brokerServiceUrl=pulsar+ssl://localhost:9886/ -useTls=true -tlsAllowInsecureConnection=true -tlsTrustCertsFilePath=/pulsar/pulsar-broker/src/test/resources/authentication/tls/cacert.pem -authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls -authParams=tlsCertFile:/pulsar/pulsar-broker/src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:/pulsar/pulsar-broker/src/test/resources/authentication/tls/client-key.pem +set -e + +ROOT_DIR=$(git rev-parse --show-toplevel) +cd $ROOT_DIR + +bin/pulsar-daemon stop standalone diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 86bad9a..e9ecbc1 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -41,20 +41,32 @@ def doHttpPost(url, data): req.add_header('Content-Type', 'application/json') urlopen(req) -import urllib2 + def doHttpPut(url, data): - opener = urllib2.build_opener(urllib2.HTTPHandler) - request = urllib2.Request(url, data=data.encode()) - request.add_header('Content-Type', 'application/json') - request.get_method = lambda: 'PUT' - opener.open(request) + try: + req = Request(url, data.encode()) + req.add_header('Content-Type', 'application/json') + req.get_method = lambda: 'PUT' + urlopen(req) + except Exception as ex: + # ignore conflicts exception to have test idempotency + if '409' in str(ex): + pass + else: + raise ex + + +def doHttpGet(url): + req = Request(url) + req.add_header('Accept', 'application/json') + return urlopen(req).read() class PulsarTest(TestCase): - serviceUrl = 'pulsar://localhost:8885' - adminUrl = 'http://localhost:8765' + serviceUrl = 'pulsar://localhost:6650' + adminUrl = 'http://localhost:8080' - serviceUrlTls = 'pulsar+ssl://localhost:9886' + serviceUrlTls = 'pulsar+ssl://localhost:6651' def test_producer_config(self): conf = ProducerConfiguration() @@ -80,23 +92,23 @@ class PulsarTest(TestCase): def test_simple_producer(self): client = Client(self.serviceUrl) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic') - producer.send('hello') + producer = client.create_producer('my-python-topic') + producer.send(b'hello') producer.close() client.close() def test_producer_send_async(self): client = Client(self.serviceUrl) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic') + producer = client.create_producer('my-python-topic') sent_messages = [] def send_callback(producer, msg): sent_messages.append(msg) - producer.send_async('hello', send_callback) - producer.send_async('hello', send_callback) - producer.send_async('hello', send_callback) + producer.send_async(b'hello', send_callback) + producer.send_async(b'hello', send_callback) + producer.send_async(b'hello', send_callback) i = 0 while len(sent_messages) < 3 and i < 100: @@ -107,11 +119,11 @@ class PulsarTest(TestCase): def test_producer_consumer(self): client = Client(self.serviceUrl) - consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-producer-consumer', + consumer = client.subscribe('my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-producer-consumer') - producer.send('hello') + producer = client.create_producer('my-python-topic-producer-consumer') + producer.send(b'hello') msg = consumer.receive(1000) self.assertTrue(msg) @@ -134,11 +146,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem')) - consumer = client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer', + consumer = client.subscribe('my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('persistent://property/cluster/namespace/my-python-topic-producer-consumer') - producer.send('hello') + producer = client.create_producer('my-python-topic-producer-consumer') + producer.send(b'hello') msg = consumer.receive(1000) self.assertTrue(msg) @@ -164,11 +176,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer', + consumer = client.subscribe('my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('persistent://property/cluster/namespace/my-python-topic-producer-consumer') - producer.send('hello') + producer = client.create_producer('my-python-topic-producer-consumer') + producer.send(b'hello') msg = consumer.receive(1000) self.assertTrue(msg) @@ -194,11 +206,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer', + consumer = client.subscribe('my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('persistent://property/cluster/namespace/my-python-topic-producer-consumer') - producer.send('hello') + producer = client.create_producer('my-python-topic-producer-consumer') + producer.send(b'hello') msg = consumer.receive(1000) self.assertTrue(msg) @@ -223,7 +235,7 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) try: - client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer', + client.subscribe('my-python-topic-producer-consumer', 'my-sub', consumer_type=ConsumerType.Shared) except: @@ -239,14 +251,14 @@ class PulsarTest(TestCase): received_messages.append(msg) consumer.acknowledge(msg) - client.subscribe('persistent://sample/standalone/ns/my-python-topic-listener', + client.subscribe('my-python-topic-listener', 'my-sub', consumer_type=ConsumerType.Exclusive, message_listener=listener) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-listener') - producer.send('hello-1') - producer.send('hello-2') - producer.send('hello-3') + producer = client.create_producer('my-python-topic-listener') + producer.send(b'hello-1') + producer.send(b'hello-2') + producer.send(b'hello-3') time.sleep(0.1) self.assertEqual(len(received_messages), 3) @@ -257,11 +269,11 @@ class PulsarTest(TestCase): def test_reader_simple(self): client = Client(self.serviceUrl) - reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-simple', + reader = client.create_reader('my-python-topic-reader-simple', MessageId.earliest) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-simple') - producer.send('hello') + producer = client.create_producer('my-python-topic-reader-simple') + producer.send(b'hello') msg = reader.read_next() self.assertTrue(msg) @@ -278,16 +290,16 @@ class PulsarTest(TestCase): def test_reader_on_last_message(self): client = Client(self.serviceUrl) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message') + producer = client.create_producer('my-python-topic-reader-on-last-message') for i in range(10): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) - reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-on-last-message', + reader = client.create_reader('my-python-topic-reader-on-last-message', MessageId.latest) for i in range(10, 20): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) for i in range(10, 20): msg = reader.read_next() @@ -300,13 +312,13 @@ class PulsarTest(TestCase): def test_reader_on_specific_message(self): client = Client(self.serviceUrl) producer = client.create_producer( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message') + 'my-python-topic-reader-on-specific-message') for i in range(10): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) reader1 = client.create_reader( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message', + 'my-python-topic-reader-on-specific-message', MessageId.earliest) for i in range(5): @@ -314,7 +326,7 @@ class PulsarTest(TestCase): last_msg_id = msg.message_id() reader2 = client.create_reader( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message', + 'my-python-topic-reader-on-specific-message', last_msg_id) for i in range(5, 10): @@ -329,18 +341,18 @@ class PulsarTest(TestCase): def test_reader_on_specific_message_with_batches(self): client = Client(self.serviceUrl) producer = client.create_producer( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + 'my-python-topic-reader-on-specific-message-with-batches', batching_enabled=True, batching_max_publish_delay_ms=1000) for i in range(10): - producer.send_async('hello-%d' % i, None) + producer.send_async(b'hello-%d' % i, None) # Send one sync message to make sure everything was published - producer.send('hello-10') + producer.send(b'hello-10') reader1 = client.create_reader( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + 'my-python-topic-reader-on-specific-message-with-batches', MessageId.earliest) for i in range(5): @@ -348,7 +360,7 @@ class PulsarTest(TestCase): last_msg_id = msg.message_id() reader2 = client.create_reader( - 'persistent://sample/standalone/ns/my-python-topic-reader-on-specific-message-with-batches', + 'my-python-topic-reader-on-specific-message-with-batches', last_msg_id) for i in range(5, 11): @@ -362,18 +374,18 @@ class PulsarTest(TestCase): def test_producer_sequence_after_reconnection(self): # Enable deduplication on namespace - doHttpPost(self.adminUrl + '/admin/namespaces/sample/standalone/ns1/deduplication', + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'true') client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' \ + topic = 'my-python-test-producer-sequence-after-reconnection-' \ + str(time.time()) producer = client.create_producer(topic, producer_name='my-producer-name') self.assertEqual(producer.last_sequence_id(), -1) for i in range(10): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) self.assertEqual(producer.last_sequence_id(), i) producer.close() @@ -382,30 +394,33 @@ class PulsarTest(TestCase): self.assertEqual(producer.last_sequence_id(), 9) for i in range(10, 20): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) self.assertEqual(producer.last_sequence_id(), i) + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', + 'false') + def test_producer_deduplication(self): # Enable deduplication on namespace - doHttpPost(self.adminUrl + '/admin/namespaces/sample/standalone/ns1/deduplication', + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'true') client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer-deduplication-' + str(time.time()) + topic = 'my-python-test-producer-deduplication-' + str(time.time()) producer = client.create_producer(topic, producer_name='my-producer-name') self.assertEqual(producer.last_sequence_id(), -1) consumer = client.subscribe(topic, 'my-sub') - producer.send('hello-0', sequence_id=0) - producer.send('hello-1', sequence_id=1) - producer.send('hello-2', sequence_id=2) + producer.send(b'hello-0', sequence_id=0) + producer.send(b'hello-1', sequence_id=1) + producer.send(b'hello-2', sequence_id=2) self.assertEqual(producer.last_sequence_id(), 2) # Repeat the messages and verify they're not received by consumer - producer.send('hello-1', sequence_id=1) - producer.send('hello-2', sequence_id=2) + producer.send(b'hello-1', sequence_id=1) + producer.send(b'hello-2', sequence_id=2) self.assertEqual(producer.last_sequence_id(), 2) for i in range(3): @@ -427,8 +442,8 @@ class PulsarTest(TestCase): self.assertEqual(producer.last_sequence_id(), 2) # Repeat the messages and verify they're not received by consumer - producer.send('hello-1', sequence_id=1) - producer.send('hello-2', sequence_id=2) + producer.send(b'hello-1', sequence_id=1) + producer.send(b'hello-2', sequence_id=2) self.assertEqual(producer.last_sequence_id(), 2) try: @@ -439,6 +454,9 @@ class PulsarTest(TestCase): # Exception is expected pass + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', + 'false') + def test_producer_routing_mode(self): client = Client(self.serviceUrl) producer = client.create_producer('my-python-test-producer', @@ -448,7 +466,7 @@ class PulsarTest(TestCase): def test_message_argument_errors(self): client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer' + topic = 'my-python-test-producer' producer = client.create_producer(topic) content = 'test'.encode('utf-8') @@ -478,7 +496,7 @@ class PulsarTest(TestCase): self._check_value_error(lambda: client.create_producer(None)) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer' + topic = 'my-python-test-producer' self._check_value_error(lambda: client.create_producer(topic, producer_name=5)) self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id='test')) @@ -495,7 +513,7 @@ class PulsarTest(TestCase): def test_consumer_argument_errors(self): client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer' + topic = 'my-python-test-producer' sub_name = 'my-sub-name' self._check_value_error(lambda: client.subscribe(None, sub_name)) @@ -509,7 +527,7 @@ class PulsarTest(TestCase): def test_reader_argument_errors(self): client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test-producer' + topic = 'my-python-test-producer' # This should not raise exception client.create_reader(topic, MessageId.earliest) @@ -522,7 +540,7 @@ class PulsarTest(TestCase): def test_publish_compact_and_consume(self): client = Client(self.serviceUrl) - topic = 'persistent://sample/standalone/ns1/my-python-test_publish_compact_and_consume' + topic = 'my-python-test_publish_compact_and_consume' producer = client.create_producer(topic, producer_name='my-producer-name', batching_enabled=False) self.assertEqual(producer.last_sequence_id(), -1) consumer = client.subscribe(topic, 'my-sub1', is_read_compacted=True) @@ -530,17 +548,15 @@ class PulsarTest(TestCase): consumer2 = client.subscribe(topic, 'my-sub2', is_read_compacted=False) # producer create 2 messages with same key. - producer.send('hello-0', partition_key='key0') - producer.send('hello-1', partition_key='key0') + producer.send(b'hello-0', partition_key='key0') + producer.send(b'hello-1', partition_key='key0') producer.close() # issue compact command, and wait success - url=self.adminUrl + '/admin/persistent/sample/standalone/ns1/my-python-test_publish_compact_and_consume/compaction' + url=self.adminUrl + '/admin/v2/persistent/public/default/my-python-test_publish_compact_and_consume/compaction' doHttpPut(url, '') while True: - req = urllib2.Request(url) - response = urllib2.urlopen(req) - s=response.read() + s=doHttpGet(url).decode('utf-8') if 'RUNNING' in s: print("Compact still running") print(s) @@ -571,15 +587,15 @@ class PulsarTest(TestCase): def test_reader_has_message_available(self): # create client, producer, reader client = Client(self.serviceUrl) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-reader-has-message-available') - reader = client.create_reader('persistent://sample/standalone/ns/my-python-topic-reader-has-message-available', + producer = client.create_producer('my-python-topic-reader-has-message-available') + reader = client.create_reader('my-python-topic-reader-has-message-available', MessageId.latest) # before produce data, expected not has message available self.assertFalse(reader.has_message_available()); for i in range(10): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) # produced data, expected has message available self.assertTrue(reader.has_message_available()); @@ -593,7 +609,7 @@ class PulsarTest(TestCase): self.assertFalse(reader.has_message_available()); for i in range(10, 20): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) # produced data again, expected has message available self.assertTrue(reader.has_message_available()); @@ -603,13 +619,13 @@ class PulsarTest(TestCase): def test_seek(self): client = Client(self.serviceUrl) - consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-seek', + consumer = client.subscribe('my-python-topic-seek', 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-seek') + producer = client.create_producer('my-python-topic-seek') for i in range(100): - producer.send('hello-%d' % i) + producer.send(b'hello-%d' % i) for i in range(100): msg = consumer.receive() @@ -635,7 +651,7 @@ class PulsarTest(TestCase): 'my-sub', consumer_type=ConsumerType.Shared) producer = client.create_producer('my-v2-topic-producer-consumer') - producer.send('hello') + producer.send(b'hello') msg = consumer.receive(1000) self.assertTrue(msg) @@ -652,14 +668,14 @@ class PulsarTest(TestCase): def test_topics_consumer(self): client = Client(self.serviceUrl) - topic1 = 'persistent://sample/standalone/ns/my-python-topics-consumer-1' - topic2 = 'persistent://sample/standalone/ns/my-python-topics-consumer-2' - topic3 = 'persistent://sample/standalone/ns/my-python-topics-consumer-3' + topic1 = 'persistent://public/default/my-python-topics-consumer-1' + topic2 = 'persistent://public/default/my-python-topics-consumer-2' + topic3 = 'persistent://public/default/my-python-topics-consumer-3' topics = [topic1, topic2, topic3] - url1 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions' - url2 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions' - url3 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions' + url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions' + url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions' + url3 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-3/partitions' doHttpPut(url1, '2') doHttpPut(url2, '3') @@ -676,13 +692,13 @@ class PulsarTest(TestCase): ) for i in range(100): - producer1.send('hello-1-%d' % i) + producer1.send(b'hello-1-%d' % i) for i in range(100): - producer2.send('hello-2-%d' % i) + producer2.send(b'hello-2-%d' % i) for i in range(100): - producer3.send('hello-3-%d' % i) + producer3.send(b'hello-3-%d' % i) for i in range(300): @@ -702,15 +718,15 @@ class PulsarTest(TestCase): import re client = Client(self.serviceUrl) - topics_pattern = 'persistent://sample/standalone/ns/my-python-pattern-consumer.*' + topics_pattern = 'persistent://public/default/my-python-pattern-consumer.*' - topic1 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-1' - topic2 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-2' - topic3 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-3' + topic1 = 'persistent://public/default/my-python-pattern-consumer-1' + topic2 = 'persistent://public/default/my-python-pattern-consumer-2' + topic3 = 'persistent://public/default/my-python-pattern-consumer-3' - url1 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-1/partitions' - url2 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-2/partitions' - url3 = self.adminUrl + '/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-3/partitions' + url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions' + url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions' + url3 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions' doHttpPut(url1, '2') doHttpPut(url2, '3') @@ -731,13 +747,13 @@ class PulsarTest(TestCase): time.sleep(2) for i in range(100): - producer1.send('hello-1-%d' % i) + producer1.send(b'hello-1-%d' % i) for i in range(100): - producer2.send('hello-2-%d' % i) + producer2.send(b'hello-2-%d' % i) for i in range(100): - producer3.send('hello-3-%d' % i) + producer3.send(b'hello-3-%d' % i) for i in range(300): @@ -763,7 +779,7 @@ class PulsarTest(TestCase): def test_get_topics_partitions(self): client = Client(self.serviceUrl) topic_partitioned = 'persistent://public/default/test_get_topics_partitions' - topic_non_partitioned = 'persistent://public/default/test_get_topics_partitions' + topic_non_partitioned = 'persistent://public/default/test_get_topics_not-partitioned' url1 = self.adminUrl + '/admin/v2/persistent/public/default/test_get_topics_partitions/partitions' doHttpPut(url1, '3') diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh index cffe002..e9caefa 100755 --- a/pulsar-client-cpp/run-unit-tests.sh +++ b/pulsar-client-cpp/run-unit-tests.sh @@ -18,44 +18,12 @@ # under the License. # -# Start 2 Pulsar standalone instances (one with TLS and one without) -# and execute the unit tests +set -e -rm -rf ./pulsar-dist -mkdir pulsar-dist -tar xfz ../distribution/server/target/apache-pulsar*bin.tar.gz -C pulsar-dist --strip-components 1 +ROOT_DIR=$(git rev-parse --show-toplevel) +cd $ROOT_DIR/pulsar-client-cpp -PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone --no-functions-worker --no-stream-storage > broker.log & -standalone_pid=$!; - -PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone-ssl.conf pulsar-dist/bin/pulsar standalone \ - --no-functions-worker \ - --no-stream-storage \ - --zookeeper-port 2191 --bookkeeper-port 3191 \ - --zookeeper-dir data2/standalone/zookeeper --bookkeeper-dir \ - data2/standalone/bookkeeper > broker-tls.log & -auth_pid=$!; - -echo "Wait for non-tls standalone up" -until grep "Created tenant public" broker.log; do sleep 5; done - -# create property for test -PULSAR_CLIENT_CONF=$PWD/test-conf/client.conf pulsar-dist/bin/pulsar-admin tenants create prop -r "" -c "unit" -echo "Created tenant 'prop' - $?" - -PULSAR_CLIENT_CONF=$PWD/test-conf/client.conf pulsar-dist/bin/pulsar-admin tenants create property -r "" -c "cluster" -echo "Created tenant 'property' - $?" - -PULSAR_CLIENT_CONF=$PWD/test-conf/client-ssl.conf pulsar-dist/bin/pulsar-admin clusters create \ - --url http://localhost:9765/ --url-secure https://localhost:9766/ \ - --broker-url pulsar://localhost:9885/ --broker-url-secure pulsar+ssl://localhost:9886/ \ - cluster -PULSAR_CLIENT_CONF=$PWD/test-conf/client-ssl.conf pulsar-dist/bin/pulsar-admin clusters create \ - --url http://localhost:9765/ --url-secure https://localhost:9766/ \ - --broker-url pulsar://localhost:9885/ --broker-url-secure pulsar+ssl://localhost:9886/ \ - unit - -sleep 5 +./pulsar-test-service-start.sh pushd tests @@ -94,11 +62,8 @@ if [ $RES -eq 0 ]; then popd popd - fi -kill -9 $standalone_pid $auth_pid - -rm -rf pulsar-dist +./pulsar-test-service-stop.sh exit $RES diff --git a/pulsar-client-cpp/test-conf/client-ssl.conf b/pulsar-client-cpp/test-conf/client-ssl.conf index d4071b4..6ca0e5a 100644 --- a/pulsar-client-cpp/test-conf/client-ssl.conf +++ b/pulsar-client-cpp/test-conf/client-ssl.conf @@ -18,10 +18,9 @@ # # Pulsar Client configuration -webServiceUrl=https://localhost:9766/ -brokerServiceUrl=pulsar+ssl://localhost:9886/ -useTls=true -tlsAllowInsecureConnection=true -tlsTrustCertsFilePath=/pulsar/pulsar-broker/src/test/resources/authentication/tls/cacert.pem +webServiceUrl=https://localhost:8443/ +brokerServiceUrl=pulsar+ssl://localhost:6651/ +tlsAllowInsecureConnection=false +tlsTrustCertsFilePath=/tmp/pulsar-test-data/certs/cacert.pem authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls -authParams=tlsCertFile:/pulsar/pulsar-broker/src/test/resources/authentication/tls/client-cert.pem,tlsKeyFile:/pulsar/pulsar-broker/src/test/resources/authentication/tls/client-key.pem +authParams=tlsCertFile:/tmp/pulsar-test-data/certs/client-cert.pem,tlsKeyFile:/tmp/pulsar-test-data/certs/client-key.pem diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf index bb097d6..da9123c 100644 --- a/pulsar-client-cpp/test-conf/standalone-ssl.conf +++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf @@ -28,12 +28,12 @@ globalZookeeperServers= # Configuration Store connection string configurationStoreServers= -brokerServicePort=9885 -brokerServicePortTls=9886 +brokerServicePort=6650 +brokerServicePortTls=6651 # Port to use to server HTTP request -webServicePort=9765 -webServicePortTls=9766 +webServicePort=8080 +webServicePortTls=8443 # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 @@ -42,7 +42,7 @@ bindAddress=0.0.0.0 advertisedAddress=localhost # Name of the cluster to which this broker belongs to -clusterName=cluster +clusterName=standalone # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 @@ -87,10 +87,12 @@ maxUnackedMessagesPerConsumer=50000 # Enable TLS tlsEnabled=true -tlsCertificateFilePath=/pulsar/pulsar-broker/src/test/resources/authentication/tls/broker-cert.pem -tlsKeyFilePath=/pulsar/pulsar-broker/src/test/resources/authentication/tls/broker-key.pem -tlsTrustCertsFilePath=/pulsar/pulsar-broker/src/test/resources/authentication/tls/cacert.pem -tlsAllowInsecureConnection=true +tlsCertificateFilePath=/tmp/pulsar-test-data/certs/broker-cert.pem +tlsKeyFilePath=/tmp/pulsar-test-data/certs/broker-key.pem +tlsTrustCertsFilePath=/tmp/pulsar-test-data/certs/cacert.pem +tlsAllowInsecureConnection=false + +anonymousUserRole=anonymous # Enable authentication authenticationEnabled=true diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc index 295e624..eef541c 100644 --- a/pulsar-client-cpp/tests/AuthPluginTest.cc +++ b/pulsar-client-cpp/tests/AuthPluginTest.cc @@ -32,7 +32,14 @@ DECLARE_LOG_OBJECT() using namespace pulsar; int globalTestTlsMessagesCounter = 0; -static std::string lookupUrlTls = "pulsar+ssl://localhost:9886"; +static const std::string serviceUrlTls = "pulsar+ssl://localhost:6651"; +static const std::string serviceUrlHttps = "https://localhost:8443"; + +static const std::string caPath = "../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem"; +static const std::string clientPublicKeyPath = + "../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem"; +static const std::string clientPrivateKeyPath = + "../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem"; static void sendCallBackTls(Result r, const Message& msg) { ASSERT_EQ(r, ResultOk); @@ -44,12 +51,9 @@ static void sendCallBackTls(Result r, const Message& msg) { TEST(AuthPluginTest, testTls) { ClientConfiguration config = ClientConfiguration(); - config.setUseTls(true); - config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem"); + config.setTlsTrustCertsFilePath(caPath); config.setTlsAllowInsecureConnection(false); - AuthenticationPtr auth = - pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem", - "../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem"); + AuthenticationPtr auth = pulsar::AuthTls::create(clientPublicKeyPath, clientPrivateKeyPath); ASSERT_TRUE(auth != NULL); ASSERT_EQ(auth->getAuthMethodName(), "tls"); @@ -61,9 +65,9 @@ TEST(AuthPluginTest, testTls) { ASSERT_EQ(auth.use_count(), 1); config.setAuth(auth); - Client client(lookupUrlTls, config); + Client client(serviceUrlTls, config); - std::string topicName = "persistent://property/cluster/namespace/test-tls"; + std::string topicName = "persistent://private/auth/test-tls"; std::string subName = "subscription-name"; int numOfMessages = 10; @@ -122,16 +126,13 @@ TEST(AuthPluginTest, testTls) { TEST(AuthPluginTest, testTlsDetectPulsarSsl) { ClientConfiguration config = ClientConfiguration(); - config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem"); + config.setTlsTrustCertsFilePath(caPath); config.setTlsAllowInsecureConnection(false); - AuthenticationPtr auth = - pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem", - "../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem"); - config.setAuth(auth); + config.setAuth(pulsar::AuthTls::create(clientPublicKeyPath, clientPrivateKeyPath)); - Client client("pulsar+ssl://localhost:9886", config); + Client client(serviceUrlTls, config); - std::string topicName = "persistent://property/cluster/namespace/test-tls-detect"; + std::string topicName = "persistent://private/auth/test-tls-detect"; Producer producer; Promise<Result, Producer> producerPromise; @@ -144,16 +145,13 @@ TEST(AuthPluginTest, testTlsDetectPulsarSsl) { TEST(AuthPluginTest, testTlsDetectHttps) { ClientConfiguration config = ClientConfiguration(); config.setUseTls(true); // shouldn't be needed soon - config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem"); + config.setTlsTrustCertsFilePath(caPath); config.setTlsAllowInsecureConnection(false); - AuthenticationPtr auth = - pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem", - "../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem"); - config.setAuth(auth); + config.setAuth(pulsar::AuthTls::create(clientPublicKeyPath, clientPrivateKeyPath)); - Client client("https://localhost:9766", config); + Client client(serviceUrlHttps, config); - std::string topicName = "persistent://property/cluster/namespace/test-tls-detect-https"; + std::string topicName = "persistent://private/auth/test-tls-detect-https"; Producer producer; Promise<Result, Producer> producerPromise; @@ -165,12 +163,17 @@ TEST(AuthPluginTest, testTlsDetectHttps) { namespace testAthenz { std::string principalToken; -void mockZTS() { +void mockZTS(int port) { + LOG_INFO("-- MockZTS started"); boost::asio::io_service io; boost::asio::ip::tcp::iostream stream; boost::asio::ip::tcp::acceptor acceptor(io, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9999)); + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)); + + LOG_INFO("-- MockZTS waiting for connnection"); acceptor.accept(*stream.rdbuf()); + LOG_INFO("-- MockZTS got connection"); + std::string headerLine; while (getline(stream, headerLine)) { std::vector<std::string> kv; @@ -190,19 +193,24 @@ void mockZTS() { break; } } + + LOG_INFO("-- MockZTS exiting"); } } // namespace testAthenz TEST(AuthPluginTest, testAthenz) { - boost::thread zts(&testAthenz::mockZTS); + boost::thread zts(boost::bind(&testAthenz::mockZTS, 9999)); pulsar::AuthenticationDataPtr data; std::string params = R"({ "tenantDomain": "pulsar.test.tenant", "tenantService": "service", "providerDomain": "pulsar.test.provider", - "privateKey": "file:../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem", + "privateKey": "file:)" + + clientPrivateKeyPath + R"(", "ztsUrl": "http://localhost:9999" })"; + + LOG_INFO("PARAMS: " << params); pulsar::AuthenticationPtr auth = pulsar::AuthAthenz::create(params); ASSERT_EQ(auth->getAuthMethodName(), "athenz"); ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk); @@ -237,23 +245,21 @@ TEST(AuthPluginTest, testDisable) { TEST(AuthPluginTest, testAuthFactoryTls) { pulsar::AuthenticationDataPtr data; - std::string tlsCertFile = "../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem"; - std::string tlsKeyFile = "../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem"; - AuthenticationPtr auth = - pulsar::AuthFactory::create("tls", "tlsCertFile:" + tlsCertFile + ",tlsKeyFile:" + tlsKeyFile); + AuthenticationPtr auth = pulsar::AuthFactory::create( + "tls", "tlsCertFile:" + clientPublicKeyPath + ",tlsKeyFile:" + clientPrivateKeyPath); ASSERT_EQ(auth->getAuthMethodName(), "tls"); ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk); ASSERT_EQ(data->hasDataForTls(), true); - ASSERT_EQ(data->getTlsCertificates(), tlsCertFile); - ASSERT_EQ(data->getTlsPrivateKey(), tlsKeyFile); + ASSERT_EQ(data->getTlsCertificates(), clientPublicKeyPath); + ASSERT_EQ(data->getTlsPrivateKey(), clientPrivateKeyPath); ClientConfiguration config = ClientConfiguration(); config.setAuth(auth); - config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem"); + config.setTlsTrustCertsFilePath(caPath); config.setTlsAllowInsecureConnection(false); - Client client("pulsar+ssl://localhost:9886", config); + Client client(serviceUrlTls, config); - std::string topicName = "persistent://property/cluster/namespace/test-tls-factory"; + std::string topicName = "persistent://private/auth/test-tls-factory"; Producer producer; Promise<Result, Producer> producerPromise; client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise)); @@ -263,15 +269,17 @@ TEST(AuthPluginTest, testAuthFactoryTls) { } TEST(AuthPluginTest, testAuthFactoryAthenz) { - boost::thread zts(&testAthenz::mockZTS); + boost::thread zts(boost::bind(&testAthenz::mockZTS, 9998)); pulsar::AuthenticationDataPtr data; std::string params = R"({ - "tenantDomain": "pulsar.test.tenant", + "tenantDomain": "pulsar.test2.tenant", "tenantService": "service", "providerDomain": "pulsar.test.provider", - "privateKey": "file:../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem", - "ztsUrl": "http://localhost:9999" + "privateKey": "file:)" + + clientPrivateKeyPath + R"(", + "ztsUrl": "http://localhost:9998" })"; + LOG_INFO("PARAMS: " << params); pulsar::AuthenticationPtr auth = pulsar::AuthFactory::create("athenz", params); ASSERT_EQ(auth->getAuthMethodName(), "athenz"); ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk); @@ -279,14 +287,18 @@ TEST(AuthPluginTest, testAuthFactoryAthenz) { ASSERT_EQ(data->hasDataFromCommand(), true); ASSERT_EQ(data->getHttpHeaders(), "Athenz-Role-Auth: mockToken"); ASSERT_EQ(data->getCommandData(), "mockToken"); + + LOG_INFO("Calling zts.join()"); zts.join(); + LOG_INFO("Done zts.join()"); + std::vector<std::string> kvs; boost::algorithm::split(kvs, testAthenz::principalToken, boost::is_any_of(";")); for (std::vector<std::string>::iterator itr = kvs.begin(); itr != kvs.end(); itr++) { std::vector<std::string> kv; boost::algorithm::split(kv, *itr, boost::is_any_of("=")); if (kv[0] == "d") { - ASSERT_EQ(kv[1], "pulsar.test.tenant"); + ASSERT_EQ(kv[1], "pulsar.test2.tenant"); } else if (kv[0] == "n") { ASSERT_EQ(kv[1], "service"); } diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index c5f4420..e475ebc 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -45,8 +45,8 @@ boost::mutex mutex_; static int globalTestBatchMessagesCounter = 0; static int globalCount = 0; static long globalResendMessageCount = 0; -static std::string lookupUrl = "pulsar://localhost:8885"; -static std::string adminUrl = "http://localhost:8765/"; +static std::string lookupUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; static void messageListenerFunction(Consumer consumer, const Message& msg) { globalCount++; consumer.acknowledge(msg); @@ -113,7 +113,7 @@ class EncKeyReader : public CryptoKeyReader { TEST(BasicEndToEndTest, testBatchMessages) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/test-batch-messages"; + std::string topicName = "persistent://public/default/test-batch-messages"; std::string subName = "subscription-name"; Producer producer; @@ -200,7 +200,7 @@ void resendMessage(Result r, const Message msg, Producer producer) { TEST(BasicEndToEndTest, testProduceConsume) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/my-topic"; + std::string topicName = "persistent://public/default/test-produce-consume"; std::string subName = "my-sub-name"; Producer producer; @@ -238,7 +238,7 @@ TEST(BasicEndToEndTest, testProduceConsume) { } TEST(BasicEndToEndTest, testLookupThrottling) { - std::string topicName = "persistent://prop/unit/ns1/testLookupThrottling"; + std::string topicName = "testLookupThrottling"; ClientConfiguration config; config.setConcurrentLookupRequest(0); Client client(lookupUrl, config); @@ -264,7 +264,7 @@ TEST(BasicEndToEndTest, testNonExistingTopic) { } TEST(BasicEndToEndTest, testNonPersistentTopic) { - std::string topicName = "non-persistent://prop/unit/ns1/testNonPersistentTopic"; + std::string topicName = "non-persistent://public/default/testNonPersistentTopic"; Client client(lookupUrl); Producer producer; Result result = client.createProducer(topicName, producer); @@ -306,7 +306,7 @@ TEST(BasicEndToEndTest, testV2TopicHttp) { } TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) { - std::string topicName = "persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions"; + std::string topicName = "testSingleClientMultipleSubscriptions"; Client client(lookupUrl); @@ -325,7 +325,7 @@ TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) { } TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions) { - std::string topicName = "persistent://prop/unit/ns1/testMultipleClientsMultipleSubscriptions"; + std::string topicName = "testMultipleClientsMultipleSubscriptions"; Client client1(lookupUrl); Client client2(lookupUrl); @@ -354,7 +354,7 @@ TEST(BasicEndToEndTest, testMultipleClientsMultipleSubscriptions) { } TEST(BasicEndToEndTest, testProduceAndConsumeAfterClientClose) { - std::string topicName = "persistent://prop/unit/ns1/testProduceAndConsumeAfterClientClose"; + std::string topicName = "testProduceAndConsumeAfterClientClose"; Client client(lookupUrl); Producer producer; @@ -410,18 +410,18 @@ TEST(BasicEndToEndTest, testProduceAndConsumeAfterClientClose) { TEST(BasicEndToEndTest, testIamSoFancyCharactersInTopicName) { Client client(lookupUrl); Producer producer; - Result result = client.createProducer("persistent://prop/unit/ns1/topic@%*)(&!%$#@#$><?", producer); + Result result = client.createProducer("persistent://public/default/topic@%*)(&!%$#@#$><?", producer); ASSERT_EQ(ResultOk, result); Consumer consumer; - result = client.subscribe("persistent://prop/unit/ns1/topic@%*)(&!%$#@#$><?", "my-sub-name", consumer); + result = client.subscribe("persistent://public/default/topic@%*)(&!%$#@#$><?", "my-sub-name", consumer); ASSERT_EQ(ResultOk, result); } TEST(BasicEndToEndTest, testSubscribeCloseUnsubscribeSherpaScenario) { ClientConfiguration config; Client client(lookupUrl, config); - std::string topicName = "persistent://prop/unit/ns1/::,::bf11"; + std::string topicName = "persistent://public/default/::,::bf11"; std::string subName = "weird-ass-characters-@%*)(&!%$#@#$><?)"; Producer producer; Result result = client.createProducer(topicName, producer); @@ -442,7 +442,7 @@ TEST(BasicEndToEndTest, testSubscribeCloseUnsubscribeSherpaScenario) { TEST(BasicEndToEndTest, testInvalidUrlPassed) { Client client("localhost:4080"); - std::string topicName = "persistent://prop/unit/ns1/testInvalidUrlPassed"; + std::string topicName = "testInvalidUrlPassed"; std::string subName = "test-sub"; Producer producer; Result result = client.createProducer(topicName, producer); @@ -467,10 +467,11 @@ TEST(BasicEndToEndTest, testInvalidUrlPassed) { TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/testPartitionedProducerConsumer"; + std::string topicName = "testPartitionedProducerConsumer"; // call admin api to make it partitioned - std::string url = adminUrl + "admin/persistent/prop/unit/ns/testPartitionedProducerConsumer/partitions"; + std::string url = + adminUrl + "admin/v2/persistent/public/default/testPartitionedProducerConsumer/partitions"; int res = makePutRequest(url, "3"); LOG_INFO("res = " << res); @@ -506,11 +507,12 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/testPartitionedProducerConsumerSubscriptionName"; + std::string topicName = "testPartitionedProducerConsumerSubscriptionName"; // call admin api to make it partitioned std::string url = - adminUrl + "admin/persistent/prop/unit/ns/testPartitionedProducerConsumerSubscriptionName/partitions"; + adminUrl + + "admin/v2/persistent/public/default/testPartitionedProducerConsumerSubscriptionName/partitions"; int res = makePutRequest(url, "3"); LOG_INFO("res = " << res); @@ -531,7 +533,7 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) { TEST(BasicEndToEndTest, testMessageTooBig) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testMessageTooBig"; + std::string topicName = "testMessageTooBig"; Producer producer; Result result = client.createProducer(topicName, producer); ASSERT_EQ(ResultOk, result); @@ -554,7 +556,7 @@ TEST(BasicEndToEndTest, testMessageTooBig) { TEST(BasicEndToEndTest, testCompressionLZ4) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/namespace1/testCompressionLZ4"; + std::string topicName = "testCompressionLZ4"; std::string subName = "my-sub-name"; Producer producer; ProducerConfiguration conf; @@ -592,7 +594,7 @@ TEST(BasicEndToEndTest, testCompressionLZ4) { TEST(BasicEndToEndTest, testCompressionZLib) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testCompressionZLib"; + std::string topicName = "testCompressionZLib"; std::string subName = "my-sub-name"; Producer producer; ProducerConfiguration conf; @@ -645,11 +647,11 @@ TEST(BasicEndToEndTest, testConfigurationFile) { TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/partition-testSinglePartitionRoutingPolicy"; + std::string topicName = "partition-testSinglePartitionRoutingPolicy"; // call admin api to make it partitioned std::string url = - adminUrl + "admin/persistent/prop/unit/ns/partition-testSinglePartitionRoutingPolicy/partitions"; + adminUrl + "admin/v2/persistent/public/default/partition-testSinglePartitionRoutingPolicy/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); @@ -695,7 +697,7 @@ TEST(BasicEndToEndTest, testNamespaceName) { TEST(BasicEndToEndTest, testConsumerClose) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testConsumerClose"; + std::string topicName = "testConsumerClose"; std::string subName = "my-sub-name"; Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer)); @@ -705,12 +707,12 @@ TEST(BasicEndToEndTest, testConsumerClose) { TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) { Client client(lookupUrl); - std::string topicName = - "persistent://prop/unit/ns/partition-testDuplicateConsumerCreationOnPartitionedTopic"; + std::string topicName = "partition-testDuplicateConsumerCreationOnPartitionedTopic"; // call admin api to make it partitioned std::string url = - adminUrl + "admin/persistent/prop/unit/ns/testDuplicateConsumerCreationOnPartitionedTopic/partitions"; + adminUrl + + "admin/v2/persistent/public/default/testDuplicateConsumerCreationOnPartitionedTopic/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); @@ -755,10 +757,10 @@ TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) { TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/partition-testRoundRobinRoutingPolicy"; + std::string topicName = "persistent://public/default/partition-testRoundRobinRoutingPolicy"; // call admin api to make it partitioned std::string url = - adminUrl + "admin/persistent/prop/unit/ns/partition-testRoundRobinRoutingPolicy/partitions"; + adminUrl + "admin/v2/persistent/public/default/partition-testRoundRobinRoutingPolicy/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); @@ -820,9 +822,10 @@ TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) { TEST(BasicEndToEndTest, testMessageListener) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/partition-testMessageListener"; + std::string topicName = "partition-testMessageListener"; // call admin api to make it partitioned - std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testMessageListener/partitions"; + std::string url = + adminUrl + "admin/v2/persistent/public/default/partition-testMessageListener/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); @@ -861,12 +864,11 @@ TEST(BasicEndToEndTest, testMessageListener) { TEST(BasicEndToEndTest, testMessageListenerPause) { Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListenerPause"; + std::string topicName = "partition-testMessageListenerPause"; // call admin api to make it partitioned std::string url = - adminUrl + - "admin/persistent/property/cluster/namespace/partition-testMessageListener-pauses/partitions"; + adminUrl + "admin/v2/persistent/public/default/partition-testMessageListener-pauses/partitions"; int res = makePutRequest(url, "5"); LOG_INFO("res = " << res); @@ -919,7 +921,7 @@ TEST(BasicEndToEndTest, testResendViaSendCallback) { ClientConfiguration clientConfiguration; clientConfiguration.setIOThreads(1); Client client(lookupUrl, clientConfiguration); - std::string topicName = "persistent://my-property/my-cluster/my-namespace/testResendViaListener"; + std::string topicName = "testResendViaListener"; Producer producer; @@ -953,7 +955,7 @@ TEST(BasicEndToEndTest, testStatsLatencies) { config.setMessageListenerThreads(1); config.setStatsIntervalInSeconds(5); Client client(lookupUrl, config); - std::string topicName = "persistent://property/cluster/namespace/testStatsLatencies"; + std::string topicName = "persistent://public/default/testStatsLatencies"; std::string subName = "subscription-name"; Producer producer; @@ -1066,7 +1068,7 @@ TEST(BasicEndToEndTest, testStatsLatencies) { TEST(BasicEndToEndTest, testProduceMessageSize) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testProduceMessageSize"; + std::string topicName = "testProduceMessageSize"; std::string subName = "my-sub-name"; Producer producer1; Producer producer2; @@ -1117,7 +1119,7 @@ TEST(BasicEndToEndTest, testProduceMessageSize) { TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { Client client(adminUrl); - std::string topicName = "persistent://prop/unit/ns1/testHandlerReconnectionLogic"; + std::string topicName = "testHandlerReconnectionLogic"; Producer producer; Consumer consumer; @@ -1183,7 +1185,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { TEST(BasicEndToEndTest, testRSAEncryption) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/my-rsaenctopic"; + std::string topicName = "my-rsaenctopic"; std::string subName = "my-sub-name"; Producer producer; @@ -1241,7 +1243,7 @@ TEST(BasicEndToEndTest, testRSAEncryption) { TEST(BasicEndToEndTest, testEncryptionFailure) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/my-rsaencfailtopic"; + std::string topicName = "my-rsaencfailtopic"; std::string subName = "my-sub-name"; Producer producer; @@ -1355,7 +1357,7 @@ TEST(BasicEndToEndTest, testEncryptionFailure) { TEST(BasicEndToEndTest, testEventTime) { ClientConfiguration config; Client client(lookupUrl, config); - std::string topicName = "persistent://prop/unit/ns1/topic"; + std::string topicName = "test-event-time"; Producer producer; ProducerConfiguration producerConf; producerConf.setBatchingEnabled(true); @@ -1381,7 +1383,7 @@ TEST(BasicEndToEndTest, testEventTime) { TEST(BasicEndToEndTest, testSeek) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testSeek"; + std::string topicName = "persistent://public/default/testSeek"; std::string subName = "sub-testSeek"; Producer producer; @@ -1448,7 +1450,7 @@ TEST(BasicEndToEndTest, testSeek) { TEST(BasicEndToEndTest, testUnAckedMessageTimeout) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testUnAckedMessageTimeout"; + std::string topicName = "testUnAckedMessageTimeout"; std::string subName = "my-sub-name"; std::string content = "msg-content"; @@ -1488,7 +1490,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeout) { TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/testUnAckedMessageTimeoutListener"; + std::string topicName = "testUnAckedMessageTimeoutListener"; std::string subName = "my-sub-name"; std::string content = "msg-content"; @@ -1525,7 +1527,7 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerTopicNameInvalid) { topicNames.reserve(3); std::string subName = "testMultiTopicsTopicNameInvalid"; // cluster empty - std::string topicName1 = "persistent://prop/testMultiTopicsTopicNameInvalid"; + std::string topicName1 = "persistent://tenant/testMultiTopicsTopicNameInvalid"; // empty topics ASSERT_EQ(0, topicNames.size()); @@ -1560,9 +1562,9 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) { std::vector<std::string> topicNames; topicNames.reserve(3); std::string subName = "testMultiTopicsDifferentNamespace"; - std::string topicName1 = "persistent://prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1"; - std::string topicName2 = "persistent://prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2"; - std::string topicName3 = "persistent://prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3"; + std::string topicName1 = "persistent://public/default/testMultiTopicsConsumerDifferentNamespace1"; + std::string topicName2 = "persistent://public/default-2/testMultiTopicsConsumerDifferentNamespace2"; + std::string topicName3 = "persistent://public/default-3/testMultiTopicsConsumerDifferentNamespace3"; topicNames.push_back(topicName1); topicNames.push_back(topicName2); @@ -1570,11 +1572,13 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) { // call admin api to make topics partitioned std::string url1 = - adminUrl + "admin/persistent/prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1/partitions"; + adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumerDifferentNamespace1/partitions"; std::string url2 = - adminUrl + "admin/persistent/prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2/partitions"; + adminUrl + + "admin/v2/persistent/public/default-2/testMultiTopicsConsumerDifferentNamespace2/partitions"; std::string url3 = - adminUrl + "admin/persistent/prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3/partitions"; + adminUrl + + "admin/v2/persistent/public/default-3/testMultiTopicsConsumerDifferentNamespace3/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); @@ -1604,10 +1608,10 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { std::vector<std::string> topicNames; topicNames.reserve(3); std::string subName = "testMultiTopicsConsumer"; - std::string topicName1 = "persistent://prop/unit/ns/testMultiTopicsConsumer1"; - std::string topicName2 = "persistent://prop/unit/ns/testMultiTopicsConsumer2"; - std::string topicName3 = "persistent://prop/unit/ns/testMultiTopicsConsumer3"; - std::string topicName4 = "persistent://prop/unit/ns/testMultiTopicsConsumer4"; + std::string topicName1 = "testMultiTopicsConsumer1"; + std::string topicName2 = "testMultiTopicsConsumer2"; + std::string topicName3 = "testMultiTopicsConsumer3"; + std::string topicName4 = "testMultiTopicsConsumer4"; topicNames.push_back(topicName1); topicNames.push_back(topicName2); @@ -1615,9 +1619,9 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) { topicNames.push_back(topicName4); // call admin api to make topics partitioned - std::string url1 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions"; - std::string url2 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer2/partitions"; - std::string url3 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer3/partitions"; + std::string url1 = adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumer1/partitions"; + std::string url2 = adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumer2/partitions"; + std::string url3 = adminUrl + "admin/v2/persistent/public/default/testMultiTopicsConsumer3/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); @@ -1727,24 +1731,24 @@ TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) { // and only receive messages from matched topics. TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { Client client(lookupUrl); - std::string pattern = "persistent://prop/unit/ns1/patternMultiTopicsConsumer.*"; + std::string pattern = "persistent://public/default/patternMultiTopicsConsumer.*"; std::string subName = "testPatternMultiTopicsConsumer"; - std::string topicName1 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1"; - std::string topicName2 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2"; - std::string topicName3 = "persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3"; + std::string topicName1 = "persistent://public/default/patternMultiTopicsConsumerPubSub1"; + std::string topicName2 = "persistent://public/default/patternMultiTopicsConsumerPubSub2"; + std::string topicName3 = "persistent://public/default/patternMultiTopicsConsumerPubSub3"; // This will not match pattern - std::string topicName4 = "persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4"; + std::string topicName4 = "persistent://public/default/patternMultiTopicsNotMatchPubSub4"; // call admin api to make topics partitioned std::string url1 = - adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsConsumerPubSub1/partitions"; std::string url2 = - adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsConsumerPubSub2/partitions"; std::string url3 = - adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsConsumerPubSub3/partitions"; std::string url4 = - adminUrl + "admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); @@ -1843,7 +1847,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { // and only receive messages from matched topics. TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { Client client(lookupUrl); - std::string pattern = "persistent://prop/unit/ns2/patternTopicsAutoConsumer.*"; + std::string pattern = "persistent://public/default/patternTopicsAutoConsumer.*"; Result result; std::string subName = "testPatternTopicsAutoConsumer"; @@ -1863,21 +1867,21 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { LOG_INFO("created pattern consumer with not match topics at beginning"); // 2. create 4 topics, in which 3 match the pattern. - std::string topicName1 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1"; - std::string topicName2 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2"; - std::string topicName3 = "persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3"; + std::string topicName1 = "persistent://public/default/patternTopicsAutoConsumerPubSub1"; + std::string topicName2 = "persistent://public/default/patternTopicsAutoConsumerPubSub2"; + std::string topicName3 = "persistent://public/default/patternTopicsAutoConsumerPubSub3"; // This will not match pattern - std::string topicName4 = "persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4"; + std::string topicName4 = "persistent://public/default/patternMultiTopicsNotMatchPubSub4"; // call admin api to make topics partitioned std::string url1 = - adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub1/partitions"; std::string url2 = - adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub2/partitions"; std::string url3 = - adminUrl + "admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub3/partitions"; std::string url4 = - adminUrl + "admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions"; + adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); @@ -1964,7 +1968,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { ClientConfiguration config; Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/test-flush-batch-messages"; + std::string topicName = "test-flush-batch-messages-" + boost::lexical_cast<std::string>(time(NULL)); std::string subName = "subscription-name"; Producer producer; @@ -2013,7 +2017,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { // message not reached max batch number, should not receive any data. Message receivedMsg; - ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 1000)); // Send Asynchronously of the other half the messages for (int i = numOfMessages / 2; i < numOfMessages; i++) { @@ -2027,19 +2031,21 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { } LOG_INFO("sending the other half messages in async, should able to receive"); // message not reached max batch number, should received the messages - ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 1000)); + LOG_INFO("Receive all messages"); // receive all the messages. int i = 1; - while (consumer.receive(receivedMsg, 5000) == ResultOk) { + while (consumer.receive(receivedMsg, 1000) == ResultOk) { std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); - LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " - << receivedMsg.getMessageId() << "]"); + LOG_INFO("Received Message with [ content - " + << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); } + LOG_INFO("Last sync send round"); // Send sync of half the messages, this will triggerFlush, and could get the messages. prefix = "msg-batch-sync"; for (int i = 0; i < numOfMessages / 2; i++) { @@ -2049,10 +2055,10 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); producer.send(msg); - LOG_DEBUG("sync sending message " << messageContent); + LOG_INFO("sync sending message " << messageContent); } // message not reached max batch number, should received the messages, and not timeout - ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000)); + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 1000)); producer.close(); client.shutdown(); @@ -2065,10 +2071,10 @@ static void simpleCallback(Result code, const Message& msg) { TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/partition-testSyncFlushBatchMessages"; + std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages"; // call admin api to make it partitioned std::string url = - adminUrl + "admin/persistent/prop/unit/ns/partition-testSyncFlushBatchMessages/partitions"; + adminUrl + "admin/v2/persistent/public/default/partition-testSyncFlushBatchMessages/partitions"; int res = makePutRequest(url, "5"); int numberOfPartitions = 5; diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index 894a582..d142b63 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -41,8 +41,8 @@ using namespace pulsar; static int globalTestBatchMessagesCounter = 0; static int globalCount = 0; -static std::string lookupUrl = "pulsar://localhost:8885"; -static std::string adminUrl = "http://localhost:8765/"; +static std::string lookupUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; // ecpoch time in seconds long epochTime = time(NULL); @@ -87,7 +87,7 @@ TEST(BatchMessageTest, testProducerTimeout) { clientConf.setStatsIntervalInSeconds(1); Client client(lookupUrl, clientConf); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -172,7 +172,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) { globalTestBatchMessagesCounter = 0; Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -256,7 +256,7 @@ TEST(BatchMessageTest, testSmallReceiverQueueSize) { clientConf.setStatsIntervalInSeconds(20); Client client(lookupUrl, clientConf); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -354,7 +354,7 @@ TEST(BatchMessageTest, testIndividualAck) { clientConfig.setStatsIntervalInSeconds(1); Client client(lookupUrl, clientConfig); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -509,7 +509,7 @@ TEST(BatchMessageTest, testCumulativeAck) { clientConfig.setStatsIntervalInSeconds(100); Client client(lookupUrl, clientConfig); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -635,7 +635,7 @@ TEST(BatchMessageTest, testMixedAck) { std::string testName = boost::lexical_cast<std::string>(epochTime) + "testMixedAck"; Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -743,7 +743,7 @@ TEST(BatchMessageTest, testPermits) { std::string testName = boost::lexical_cast<std::string>(epochTime) + "testPermits"; Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Producer producer; @@ -855,12 +855,11 @@ TEST(BatchMessageTest, testPermits) { TEST(BatchMessageTest, testPartitionedTopics) { Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/test-partitioned-batch-messages-" + + std::string topicName = "persistent://public/default/test-partitioned-batch-messages-" + boost::lexical_cast<std::string>(epochTime); // call admin api to make it partitioned - std::string url = adminUrl + - "admin/persistent/property/cluster/namespace/test-partitioned-batch-messages-" + + std::string url = adminUrl + "admin/v2/persistent/public/default/test-partitioned-batch-messages-" + boost::lexical_cast<std::string>(epochTime) + "/partitions"; int res = makePutRequest(url, "7"); diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc index f706eb8..8fd0354 100644 --- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc +++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc @@ -36,14 +36,13 @@ using namespace pulsar; TEST(BinaryLookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = boost::make_shared<ExecutorServiceProvider>(1); AuthenticationPtr authData = AuthFactory::Disabled(); - std::string url = "pulsar://localhost:8885"; + std::string url = "pulsar://localhost:6650"; ClientConfiguration conf; ExecutorServiceProviderPtr ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1)); ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); BinaryProtoLookupService lookupService(pool_, url); - std::string topic = "persistent://prop/unit/ns1/topic"; - TopicNamePtr topicName = TopicName::get(topic); + TopicNamePtr topicName = TopicName::get("topic"); Future<Result, LookupDataResultPtr> partitionFuture = lookupService.getPartitionMetadataAsync(topicName); LookupDataResultPtr lookupData; @@ -51,8 +50,7 @@ TEST(BinaryLookupServiceTest, basicLookup) { ASSERT_TRUE(lookupData != NULL); ASSERT_EQ(0, lookupData->getPartitions()); - Future<Result, LookupDataResultPtr> future = - lookupService.lookupAsync("persistent://prop/unit/ns1/topic"); + Future<Result, LookupDataResultPtr> future = lookupService.lookupAsync("topic"); result = future.get(lookupData); ASSERT_EQ(ResultOk, result); @@ -61,22 +59,22 @@ TEST(BinaryLookupServiceTest, basicLookup) { } TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { - std::string url = "pulsar://localhost:8885"; - std::string adminUrl = "http://localhost:8765/"; + std::string url = "pulsar://localhost:6650"; + std::string adminUrl = "http://localhost:8080/"; Result result; // 1. create some topics under same namespace Client client(url); - std::string topicName1 = "persistent://prop/unit/ns4/basicGetNamespaceTopics1"; - std::string topicName2 = "persistent://prop/unit/ns4/basicGetNamespaceTopics2"; - std::string topicName3 = "persistent://prop/unit/ns4/basicGetNamespaceTopics3"; + std::string topicName1 = "persistent://public/default/basicGetNamespaceTopics1"; + std::string topicName2 = "persistent://public/default/basicGetNamespaceTopics2"; + std::string topicName3 = "persistent://public/default/basicGetNamespaceTopics3"; // This is not in same namespace. - std::string topicName4 = "persistent://prop/unit/ns2/basicGetNamespaceTopics4"; + std::string topicName4 = "persistent://public/default-2/basicGetNamespaceTopics4"; // call admin api to make topics partitioned - std::string url1 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions"; - std::string url2 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions"; - std::string url3 = adminUrl + "admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions"; + std::string url1 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics1/partitions"; + std::string url2 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics2/partitions"; + std::string url3 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics3/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); @@ -116,10 +114,10 @@ TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) { ASSERT_TRUE(topicsData != NULL); // 3. verify result contains first 3 topic - ASSERT_EQ(topicsData->size(), 3); ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) != topicsData->end()); ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) != topicsData->end()); ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) != topicsData->end()); + ASSERT_FALSE(std::find(topicsData->begin(), topicsData->end(), topicName4) != topicsData->end()); client.shutdown(); } diff --git a/pulsar-client-cpp/tests/ClientDeduplicationTest.cc b/pulsar-client-cpp/tests/ClientDeduplicationTest.cc index b235040..c7beeb0 100644 --- a/pulsar-client-cpp/tests/ClientDeduplicationTest.cc +++ b/pulsar-client-cpp/tests/ClientDeduplicationTest.cc @@ -27,24 +27,31 @@ using namespace pulsar; -static std::string serviceUrl = "pulsar://localhost:8885"; -static std::string adminUrl = "http://localhost:8765/"; +static std::string serviceUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; TEST(ClientDeduplicationTest, testProducerSequenceAfterReconnect) { Client client(serviceUrl); - std::string topicName = "persistent://sample/standalone/ns-dedup-1/testProducerSequenceAfterReconnect-" + + std::string topicName = "persistent://public/dedup-1/testProducerSequenceAfterReconnect-" + boost::lexical_cast<std::string>(time(NULL)); // call admin api to create namespace and enable deduplication - std::string url = adminUrl + "admin/namespaces/sample/standalone/ns-dedup-1"; - int res = makePutRequest(url, ""); + std::string url = adminUrl + "admin/v2/namespaces/public/dedup-1"; + int res = makePutRequest(url, R"({"replication_clusters": ["standalone"]})"); ASSERT_TRUE(res == 204 || res == 409); - url = adminUrl + "admin/namespaces/sample/standalone/ns-dedup-1/deduplication"; + url = adminUrl + "admin/v2/namespaces/public/dedup-1/permissions/anonymous"; + res = makePostRequest(url, R"(["produce","consume"])"); + ASSERT_TRUE(res == 204 || res == 409); + + url = adminUrl + "admin/v2/namespaces/public/dedup-1/deduplication"; res = makePostRequest(url, "true"); ASSERT_TRUE(res == 204 || res == 409); + // Ensure dedup status was refreshed + sleep(1); + ReaderConfiguration readerConf; Reader reader; ASSERT_EQ(client.createReader(topicName, MessageId::earliest(), readerConf, reader), ResultOk); @@ -79,17 +86,20 @@ TEST(ClientDeduplicationTest, testProducerSequenceAfterReconnect) { } TEST(ClientDeduplicationTest, testProducerDeduplication) { - Client client(serviceUrl); + Client client(adminUrl); - std::string topicName = "persistent://sample/standalone/ns-dedup-2/testProducerDeduplication-" + + std::string topicName = "persistent://public/dedup-2/testProducerDeduplication-" + boost::lexical_cast<std::string>(time(NULL)); - // call admin api to create namespace and enable deduplication - std::string url = adminUrl + "admin/namespaces/sample/standalone/ns-dedup-2"; - int res = makePutRequest(url, ""); + std::string url = adminUrl + "admin/v2/namespaces/public/dedup-2"; + int res = makePutRequest(url, R"({"replication_clusters": ["standalone"]})"); + ASSERT_TRUE(res == 204 || res == 409); + + url = adminUrl + "admin/v2/namespaces/public/dedup-2/permissions/anonymous"; + res = makePostRequest(url, R"(["produce","consume"])"); ASSERT_TRUE(res == 204 || res == 409); - url = adminUrl + "admin/namespaces/sample/standalone/ns-dedup-2/deduplication"; + url = adminUrl + "admin/v2/namespaces/public/dedup-2/deduplication"; res = makePostRequest(url, "true"); ASSERT_TRUE(res == 204 || res == 409); diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 5bc0246..d4eaca7 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -23,7 +23,7 @@ using namespace pulsar; -static std::string lookupUrl = "pulsar://localhost:8885"; +static std::string lookupUrl = "pulsar://localhost:6650"; TEST(ClientTest, testChecksumComputation) { std::string data = "test"; diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc index cede638..e4944ec 100644 --- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc +++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc @@ -25,8 +25,8 @@ using namespace pulsar; TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) { - std::string lookupUrl = "pulsar://localhost:8885"; - std::string topicName = "persistent://prop/unit/ns1/persist-topic"; + std::string lookupUrl = "pulsar://localhost:6650"; + std::string topicName = "persist-topic"; std::string subName = "test-persist-exclusive"; Result result; @@ -45,8 +45,8 @@ TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) { } TEST(ConsumerConfigurationTest, testReadCompactPersistentFailover) { - std::string lookupUrl = "pulsar://localhost:8885"; - std::string topicName = "persistent://prop/unit/ns1/persist-topic"; + std::string lookupUrl = "pulsar://localhost:6650"; + std::string topicName = "persist-topic"; std::string subName = "test-persist-fail-over"; Result result; @@ -65,8 +65,8 @@ TEST(ConsumerConfigurationTest, testReadCompactPersistentFailover) { } TEST(ConsumerConfigurationTest, testReadCompactPersistentShared) { - std::string lookupUrl = "pulsar://localhost:8885"; - std::string topicName = "persistent://prop/unit/ns1/persist-topic"; + std::string lookupUrl = "pulsar://localhost:6650"; + std::string topicName = "persist-topic"; std::string subName = "test-persist-shared"; Result result; @@ -85,8 +85,8 @@ TEST(ConsumerConfigurationTest, testReadCompactPersistentShared) { } TEST(ConsumerConfigurationTest, testReadCompactNonPersistentExclusive) { - std::string lookupUrl = "pulsar://localhost:8885"; - std::string topicName = "non-persistent://prop/unit/ns1/testNonPersistentTopic"; + std::string lookupUrl = "pulsar://localhost:6650"; + std::string topicName = "non-persistent://public/default/testNonPersistentTopic"; std::string subName = "test-non-persist-exclusive"; Result result; diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index 6843027..585d664 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -36,8 +36,8 @@ DECLARE_LOG_OBJECT(); using namespace pulsar; -static std::string lookupUrl = "http://localhost:8765"; -static std::string adminUrl = "http://localhost:8765/"; +static std::string lookupUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch, int index) { @@ -60,7 +60,7 @@ TEST(ConsumerStatsTest, testBacklogInfo) { long epochTime = time(NULL); std::string testName = "testBacklogInfo-" + boost::lexical_cast<std::string>(epochTime); Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; ConsumerConfiguration conf; conf.setBrokerConsumerStatsCacheTimeInMs(3 * 1000); @@ -115,7 +115,7 @@ TEST(ConsumerStatsTest, testFailure) { long epochTime = time(NULL); std::string testName = "testFailure-" + boost::lexical_cast<std::string>(epochTime); Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; Consumer consumer; Promise<Result, Consumer> consumerPromise; @@ -160,7 +160,7 @@ TEST(ConsumerStatsTest, testCachingMechanism) { long epochTime = time(NULL); std::string testName = "testCachingMechanism-" + boost::lexical_cast<std::string>(epochTime); Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; ConsumerConfiguration conf; conf.setBrokerConsumerStatsCacheTimeInMs(3.5 * 1000); @@ -235,11 +235,11 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { long epochTime = time(NULL); std::string testName = "testAsyncCallOnPartitionedTopic-" + boost::lexical_cast<std::string>(epochTime); Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string topicName = "persistent://public/default/" + testName; std::string subName = "subscription-name"; // call admin api to create partitioned topics - std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; + std::string url = adminUrl + "admin/v2/persistent/public/default/" + testName + "/partitions"; int res = makePutRequest(url, "7"); LOG_INFO("res = " << res); diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index fd6cf5d..0ca1f75 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -28,12 +28,12 @@ DECLARE_LOG_OBJECT() using namespace pulsar; -static std::string serviceUrl = "pulsar://localhost:8885"; +static std::string serviceUrl = "pulsar://localhost:6650"; TEST(ReaderTest, testSimpleReader) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/test-simple-reader"; + std::string topicName = "persistent://public/default/test-simple-reader"; ReaderConfiguration readerConf; Reader reader; @@ -65,7 +65,7 @@ TEST(ReaderTest, testSimpleReader) { TEST(ReaderTest, testReaderAfterMessagesWerePublished) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testReaderAfterMessagesWerePublished"; + std::string topicName = "persistent://public/default/testReaderAfterMessagesWerePublished"; Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -97,7 +97,7 @@ TEST(ReaderTest, testReaderAfterMessagesWerePublished) { TEST(ReaderTest, testMultipleReaders) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testMultipleReaders"; + std::string topicName = "persistent://public/default/testMultipleReaders"; Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -142,7 +142,7 @@ TEST(ReaderTest, testMultipleReaders) { TEST(ReaderTest, testReaderOnLastMessage) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testReaderOnLastMessage"; + std::string topicName = "persistent://public/default/testReaderOnLastMessage"; Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -180,7 +180,7 @@ TEST(ReaderTest, testReaderOnLastMessage) { TEST(ReaderTest, testReaderOnSpecificMessage) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testReaderOnSpecificMessage"; + std::string topicName = "persistent://public/default/testReaderOnSpecificMessage"; Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -231,7 +231,7 @@ TEST(ReaderTest, testReaderOnSpecificMessage) { TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testReaderOnSpecificMessageWithBatches"; + std::string topicName = "persistent://public/default/testReaderOnSpecificMessageWithBatches"; Producer producer; // Enable batching @@ -291,7 +291,7 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) { TEST(ReaderTest, testReaderReachEndOfTopic) { Client client(serviceUrl); - std::string topicName = "persistent://property/cluster/namespace/testReaderReachEndOfTopic"; + std::string topicName = "persistent://public/default/testReaderReachEndOfTopic"; // 1. create producer Producer producer; @@ -366,8 +366,7 @@ TEST(ReaderTest, testReaderReachEndOfTopic) { TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { Client client(serviceUrl); - std::string topicName = - "persistent://property/cluster/namespace/testReaderReachEndOfTopicMessageWithBatches"; + std::string topicName = "persistent://public/default/testReaderReachEndOfTopicMessageWithBatches"; // 1. create producer Producer producer; diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc index e1cd1fc..890cee6 100644 --- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc +++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc @@ -26,7 +26,7 @@ using namespace pulsar; static int totalMessages = 10; static int globalCount = 0; -static std::string lookupUrl = "pulsar://localhost:8885"; +static std::string lookupUrl = "pulsar://localhost:6650"; static std::string contentBase = "msg-"; static void messageListenerFunction(Consumer consumer, const Message& msg, Latch& latch) { @@ -41,7 +41,7 @@ static void messageListenerFunction(Consumer consumer, const Message& msg, Latch TEST(ZeroQueueSizeTest, testProduceConsume) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns1/zero-queue-size"; + std::string topicName = "zero-queue-size"; std::string subName = "my-sub-name"; Producer producer; @@ -80,7 +80,7 @@ TEST(ZeroQueueSizeTest, testProduceConsume) { TEST(ZeroQueueSizeTest, testMessageListener) { Client client(lookupUrl); - std::string topicName = "persistent://prop/unit/ns/zero-queue-size-listener"; + std::string topicName = "zero-queue-size-listener"; std::string subName = "my-sub-name"; Producer producer; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index ab9686c..8a1e258 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -114,10 +114,6 @@ public class PerformanceConsumer { public String authParams; @Parameter(names = { - "--use-tls" }, description = "Use TLS encryption on the connection") - public boolean useTls; - - @Parameter(names = { "--trust-cert-file" }, description = "Path for the trusted TLS certificate file") public String tlsTrustCertsFilePath = ""; @@ -178,10 +174,6 @@ public class PerformanceConsumer { arguments.authParams = prop.getProperty("authParams", null); } - if (arguments.useTls == false) { - arguments.useTls = Boolean.parseBoolean(prop.getProperty("useTls")); - } - if (isBlank(arguments.tlsTrustCertsFilePath)) { arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", ""); } @@ -218,7 +210,6 @@ public class PerformanceConsumer { .connectionsPerBroker(arguments.maxConnections) // .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // .ioThreads(Runtime.getRuntime().availableProcessors()) // - .enableTls(arguments.useTls) // .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); if (isNotBlank(arguments.authPluginClassName)) { clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index bfd4fbb..6967d43 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -140,10 +140,6 @@ public class PerformanceProducer { public double warmupTimeSeconds = 1.0; @Parameter(names = { - "--use-tls" }, description = "Use TLS encryption on the connection") - public boolean useTls; - - @Parameter(names = { "--trust-cert-file" }, description = "Path for the trusted TLS certificate file") public String tlsTrustCertsFilePath = ""; @@ -206,10 +202,6 @@ public class PerformanceProducer { arguments.authParams = prop.getProperty("authParams", null); } - if (arguments.useTls == false) { - arguments.useTls = Boolean.parseBoolean(prop.getProperty("useTls")); - } - if (isBlank(arguments.tlsTrustCertsFilePath)) { arguments.tlsTrustCertsFilePath = prop.getProperty("tlsTrustCertsFilePath", ""); } @@ -237,7 +229,6 @@ public class PerformanceProducer { .connectionsPerBroker(arguments.maxConnections) // .ioThreads(Runtime.getRuntime().availableProcessors()) // .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // - .enableTls(arguments.useTls) // .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); if (isNotBlank(arguments.authPluginClassName)) {