2020-03-10 11:23:19 UTC - Vladimir Shchur: Hi! I'm trying to make a dynamic function which Input topic should depend on Output topic (since I specify input topic as pattern). It should be possible <https://pulsar.apache.org/docs/en/functions-deploy/#default-arguments|based on documentation>, but doesn't seem to work for me and <https://pulsar.apache.org/docs/en/pulsar-admin/#create-1|another documentation page> states _The function's output topic (If none is specified, no output is written)._ Does it mean that output topic can't be dynamic? ---- 2020-03-10 12:23:34 UTC - Jan-Pieter George: Thanks. I think there's a benefit of making this a broker feature as well on top of this, trusting clients to be perfect and handle all potential network and crash conditions in such a way that if anything happens it stops heartbeating feels a bit off. What would be the best way to raise that discussion, create an issue at <https://github.com/apache/pulsar>? ---- 2020-03-10 12:25:31 UTC - Jan-Pieter George: <https://github.com/apache/pulsar/issues/4861> does seem to suggest there is an ackTimeout the broker manages outside of reconnected consumers. ---- 2020-03-10 13:42:24 UTC - ted: @ted has joined the channel ---- 2020-03-10 15:23:18 UTC - Mathieu Druart: @Vladimir Shchur you can specify the output topic of your function dynamicly with the context : ---- 2020-03-10 15:23:39 UTC - Mathieu Druart: ```context.newOutputMessage("<persistent://public/default/>" + topicName, Schema.INT64) .key(heureCourante) .value(context.getCounter(counterName)) .send();``` ---- 2020-03-10 15:26:01 UTC - Mathieu Druart: the return type of the `process` method should be `Void` ---- 2020-03-10 15:26:56 UTC - Vladimir Shchur: @Mathieu Druart thank you, I'll try this workaround, but wanted to use output topic as a cleaner approach. The documentation is misleading :disappointed: ---- 2020-03-10 15:29:18 UTC - Vladimir Shchur: How do I get topicName in your example? ---- 2020-03-10 15:29:32 UTC - Mathieu Druart: actually I never tried to use `{input topic}-{function name}-output` as an output topic name ---- 2020-03-10 15:30:41 UTC - Mathieu Druart: in my example topicName is calculated in the process method ---- 2020-03-10 15:32:04 UTC - Vladimir Shchur: I'd like the output topic to depend on input topic name like `{inputTopicName}-output` while input topicName can be any that serves input topic regex template, is it achievable? ---- 2020-03-10 15:34:09 UTC - Andy Papia: So it looks like the bookkeepers are fine now but the brokers can't connect to them: ```15:32:13.169 [bookkeeper-io-14-3] ERROR org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to bookie: [id: 0xcbbef535]/100.96.19.2:3181, current state CONNECTING : io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: /100.96.19.2:3181``` ---- 2020-03-10 15:34:44 UTC - Andy Papia: Shouldn't the brokers be getting the hostname addresses through Zookeeper? ---- 2020-03-10 15:35:04 UTC - Mathieu Druart: maybe you can get the input topic name with `context.*getCurrentRecord*().*getTopicName*()` ? +1 : Vladimir Shchur ---- 2020-03-10 15:39:06 UTC - Vladimir Shchur: Looks feasible, will try, thank you! ---- 2020-03-10 15:40:25 UTC - Mathieu Druart: I don't know if it's the best way to achieve this, but you're welcome ---- 2020-03-10 15:42:46 UTC - Bobby: when creating a client with go, is there a way to specify namespace? I'm not seeing it in the producer or client creation docs online, could be looking in the wrong place though ---- 2020-03-10 15:43:31 UTC - Matteo Merli: The topic name will include the namespace. eg. `my-tenant/my-namespace/my-topic` ---- 2020-03-10 15:43:44 UTC - Bobby: oh gotcha ---- 2020-03-10 15:47:44 UTC - Vladimir Shchur: If there is no other way to achieve it, then any way is the best :slightly_smiling_face: ---- 2020-03-10 15:52:34 UTC - Bobby: does anyone know what this means? ``` 2020/03/10 09:46:40.706 c_client.go:68: [info] WARN | ClientConnection:925 | [10.169.240.171:58836 -> 172.27.223.166:6650] Received error response from server: 11 -- req_id: 0``` ---- 2020-03-10 15:56:51 UTC - Bobby: does pulsar need to be configured for persistence for this to work? ---- 2020-03-10 15:58:04 UTC - Andy Papia: ok I had to delete the ZK persistent volumes as well. It looks like it might be up now. ---- 2020-03-10 15:59:31 UTC - Chris Bartholomew: Token-based authentication works. You just need to add the header: ```Authorization: Bearer <token>```
---- 2020-03-10 16:08:56 UTC - Liam Condon: I assume you're trying to publish to a persistent topic? 11 is likely a BrokerPersistenceError ---- 2020-03-10 16:09:41 UTC - Bobby: yeah that's what i was attempting. That's configured on the broker though right? I was publishing via pulsar-admin ok to this same cluster/namespace ---- 2020-03-10 16:15:31 UTC - Liam Condon: yes - the broker needs to have `enablePersistentTopics=true` in its `broker.conf` ---- 2020-03-10 16:21:14 UTC - Bobby: ok cool, thank you ---- 2020-03-10 16:23:23 UTC - Alexander Ursu: I've tried creating a simple MySQL sink connector as explained here (<https://pulsar.apache.org/docs/en/io-quickstart/#connect-pulsar-to-mysql>), but it seems that by connector just keeps restarting, and I can't seem to find any useful logs to tell me why it is ---- 2020-03-10 16:30:09 UTC - Sijie Guo: Did you check the function log file? ---- 2020-03-10 16:30:47 UTC - Alexander Ursu: How would I access that? ---- 2020-03-10 16:41:38 UTC - Sijie Guo: Are you running standalone? If so, you can access the log file in logs/<tenant>/<namespace>/<connector-name> ---- 2020-03-10 16:41:56 UTC - Alexander Ursu: clustered ---- 2020-03-10 17:00:57 UTC - Alexander Ursu: Found it, I believe these are the most recent logs: ---- 2020-03-10 17:00:57 UTC - Alexander Ursu: ```componentType: SINK 16:58:58.703 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Load JAR: /pulsar/download/pulsar_functions/public/default/pulsar-mysql-jdbc-sink/0/pulsar-io-jdbc-2.5.0.nar 16:58:58.972 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Initialize function class loader for function pulsar-mysql-jdbc-sink at function cache manager 16:58:59.583 [public/default/pulsar-mysql-jdbc-sink-0] INFO <http://org.apache.pulsar.io|org.apache.pulsar.io>.jdbc.JdbcAbstractSink - Opened jdbc connection: jdbc:<mysql://mysql:3306/pulsar_mysql_jdbc_sink>, autoCommit: false 16:58:59.634 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.source.PulsarSource - Opening pulsar source with config: PulsarSourceConfig(processingGuarantees=ATLEAST_ONCE, subscriptionType=Shared, subscriptionName=public/default/pulsar-mysql-jdbc-sink, subscriptionPosition=Latest, maxMessageRetries=-1, deadLetterTopic=null, topicSchema={ftx=ConsumerConfig(schemaType=null, serdeClassName=null, isRegexPattern=false, receiverQueueSize=null)}, typeClassName=org.apache.pulsar.client.api.schema.GenericRecord, timeoutMs=null) 16:58:59.641 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.source.PulsarSource - Creating consumers for topic : ftx, schema : org.apache.pulsar.client.impl.schema.AutoConsumeSchema@1386034d 16:58:59.866 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xa54de7af, L:/10.0.3.44:36866 - R:475eab276fee/10.0.3.44:6650]] Connected to server 16:58:59.900 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Configuring schema for topic ftx : { "name": "public/default/ftx", "schema": { "type": "record", "name": "Test", "fields": [ { "name": "market", "type": [ "null", "string" ] } ] }, "type": "AVRO", "properties": {} } 16:59:00.063 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema for topic ftx : {"type":"record","name":"Test","fields":[{"name":"market","type":["null","string"]}]} 16:59:00.281 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: { "topicNames" : [ "ftx" ], "topicsPattern" : null, "subscriptionName" : "public/default/pulsar-mysql-jdbc-sink", "subscriptionType" : "Shared", "receiverQueueSize" : 1000, "acknowledgementsGroupTimeMicros" : 100000, "negativeAckRedeliveryDelayMicros" : 60000000, "maxTotalReceiverQueueSizeAcrossPartitions" : 50000, "consumerName" : null, "ackTimeoutMillis" : 0, "tickDurationMillis" : 1000, "priorityLevel" : 0, "cryptoFailureAction" : "CONSUME", "properties" : { "application" : "pulsar-sink", "id" : "public/default/pulsar-mysql-jdbc-sink", "instance_id" : "0" }, "readCompacted" : false, "subscriptionInitialPosition" : "Latest", "patternAutoDiscoveryPeriod" : 1, "regexSubscriptionMode" : "PersistentOnly", "deadLetterPolicy" : null, "autoUpdatePartitions" : true, "replicateSubscriptionState" : false, "resetIncludeHead" : false, "keySharedPolicy" : null } 16:59:00.285 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: { "serviceUrl" : "<pulsar://475eab276fee:6650>", "authPluginClassName" : null, "authParams" : null, "operationTimeoutMs" : 30000, "statsIntervalSeconds" : 60, "numIoThreads" : 1, "numListenerThreads" : 1, "connectionsPerBroker" : 1, "useTcpNoDelay" : true, "useTls" : false, "tlsTrustCertsFilePath" : null, "tlsAllowInsecureConnection" : true, "tlsHostnameVerificationEnable" : false, "concurrentLookupRequest" : 5000, "maxLookupRequest" : 50000, "maxNumberOfRejectedRequestPerConnection" : 50, "keepAliveIntervalSeconds" : 30, "connectionTimeoutMs" : 10000, "requestTimeoutMs" : 60000, "initialBackoffIntervalNanos" : 100000000, "maxBackoffIntervalNanos" : 60000000000 } 16:59:00.298 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [ftx][public/default/pulsar-mysql-jdbc-sink] Subscribing to topic on cnx [id: 0xa54de7af, L:/10.0.3.44:36866 - R:475eab276fee/10.0.3.44:6650] 16:59:00.315 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [ftx][public/default/pulsar-mysql-jdbc-sink] Subscribed to topic on 475eab276fee/10.0.3.44:6650 -- consumer: 0 16:59:00.336 [pulsar-client-io-1-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized 16:59:00.369 [public/default/pulsar-mysql-jdbc-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/pulsar-mysql-jdbc-sink:0] Uncaught exception in Java Instance java.lang.NullPointerException: null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[java-instance.jar:?] at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[java-instance.jar:?] at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-instance.jar:?] at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-instance.jar:?] at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:98) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0] at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:96) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0] at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:39) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0] at org.apache.pulsar.client.api.Schema.decode(Schema.java:95) ~[java-instance.jar:?] at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:273) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0] at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:2.5.0] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:472) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232] 16:59:00.376 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance 16:59:00.421 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [ftx] [public/default/pulsar-mysql-jdbc-sink] Closed consumer 16:59:00.423 [public/default/pulsar-mysql-jdbc-sink-0] INFO <http://org.apache.pulsar.io|org.apache.pulsar.io>.jdbc.JdbcAbstractSink - Closed jdbc connection: jdbc:<mysql://mysql:3306/pulsar_mysql_jdbc_sink> 16:59:00.426 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Unloading JAR files for function InstanceConfig(instanceId=0, functionId=880544b4-e16e-4147-8064-e4eb33054ef6, functionVersion=e07d4f13-3998-4eed-b45f-d286b732dbc2, functionDetails=tenant: "public" namespace: "default" name: "pulsar-mysql-jdbc-sink" className: "org.apache.pulsar.functions.api.utils.IdentityFunction" autoAck: true parallelism: 1 source { typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord" inputSpecs { key: "ftx" value { } } cleanupSubscription: true } sink { className: "<http://org.apache.pulsar.io|org.apache.pulsar.io>.jdbc.JdbcAutoSchemaSink" configs: "{\"userName\":\"root\",\"password\":\"example\",\"jdbcUrl\":\"jdbc:<mysql://mysql:3306/pulsar_mysql_jdbc_sink>\",\"tableName\":\"pulsar_mysql_jdbc_sink\"}" typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord" } resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 } componentType: SINK , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=39847, clusterName=pulsar-cluster-1) 16:59:00.426 [main] INFO org.apache.pulsar.functions.runtime.JavaInstanceStarter - RuntimeSpawner quit, shutting down JavaInstance 16:59:00.428 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: <pulsar://475eab276fee:6650> 16:59:00.431 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xa54de7af, L:/10.0.3.44:36866 ! R:475eab276fee/10.0.3.44:6650] Disconnected``` ---- 2020-03-10 17:27:03 UTC - Bobby: so i just made that modification to the conf file and i'm still getting an error ---- 2020-03-10 17:28:40 UTC - Bobby: ```2020/03/10 11:27:46.515 c_client.go:68: [info] INFO | ConnectionPool:85 | Created connection for <pulsar://172.27.223.166:6650> 2020/03/10 11:27:46.524 c_client.go:68: [info] INFO | ClientConnection:330 | [10.169.240.171:60285 -> 172.27.223.166:6650] Connected to broker 2020/03/10 11:27:46.547 c_client.go:68: [info] INFO | HandlerBase:53 | [<persistent://public/prad/my-topic2>, ] Getting connection from pool 2020/03/10 11:27:46.733 c_client.go:68: [info] INFO | ConnectionPool:85 | Created connection for <pulsar://192.168.74.22:6650> 2020/03/10 11:27:46.740 c_client.go:68: [info] INFO | ClientConnection:332 | [10.169.240.171:60286 -> 172.27.223.166:6650] Connected to broker through proxy. Logical broker: <pulsar://192.168.74.22:6650> 2020/03/10 11:27:46.989 c_client.go:68: [info] WARN | ClientConnection:925 | [10.169.240.171:60286 -> 172.27.223.166:6650] Received error response from server: 11 -- req_id: 0 2020/03/10 11:27:46.989 c_client.go:68: [info] ERROR | ProducerImpl:219 | [<persistent://public/prad/my-topic2>, ] Failed to create producer: BrokerPersistenceError 2020/03/10 11:27:46.989 c_client.go:68: [info] INFO | ProducerImpl:474 | Producer - [<persistent://public/prad/my-topic2>, ] , [batching = off] 2020/03/10 11:27:46 Could not instantiate Pulsar producer: Failed to create Producer: BrokerPersistenceError exit status 1``` ---- 2020-03-10 17:29:21 UTC - Andy Papia: I used this code to measure the delay but I'm getting about 2300 ms. Is this unexpected? Is there a faster way to create topics? ```long producerStart = System.currentTimeMillis(); producer = client.newProducer() .topic(streamId) .create(); long producerEnd = System.currentTimeMillis(); <http://log.info|log.info>("Producer creation in {} ms", (producerEnd - producerStart));``` ---- 2020-03-10 17:30:31 UTC - Jess Olson: @Jess Olson has joined the channel ---- 2020-03-10 17:30:33 UTC - Bobby: looks like it's erroring on the send ---- 2020-03-10 17:33:20 UTC - Liam Condon: when creating a producer a producer name must be specified ---- 2020-03-10 17:34:36 UTC - Bobby: i just tried that as well ---- 2020-03-10 17:34:37 UTC - Bobby: ```2020/03/10 11:34:04.843 c_client.go:68: [info] ERROR | ProducerImpl:219 | [<persistent://public/prad/my-topic2>, producer1] Failed to create producer: BrokerPersistenceError``` ---- 2020-03-10 17:47:53 UTC - Liam Condon: your brokers and bookkeepers are properly configured and running? ---- 2020-03-10 17:52:44 UTC - Sijie Guo: FYI (in case if you are not on mailing lists or didn’t follow @PulsarSummit twitter handle) In light of growing concern and the further spread of the COVID-19 (Corona) virus and after close consultation with event stakeholders and the organizing parties, we have decided to reschedule the Pulsar Summit San Francisco *from April to August/September*. It was decided that with due regard to the health and safety of our attendees, our hosting city, and the ever-increasing travel restrictions, it was necessary to reschedule the event. We are still working closely with the conference center on confirming the new date for the conference. We are holding on publishing the schedule and opening the registration until the rescheduled date is confirmed. Thank you very much for your understanding! Please reach out to us if you have any questions. +1 : Devin G. Bost, Karthik Ramasamy, Luke Lu, David Kjerrumgaard, SuyambuGanesh, Jeon.DeukJin ---- 2020-03-10 18:27:58 UTC - Pierre-Yves Lebecq: Hello :wave:, sorry for the noob question but I’m currently trying to run simple pulsar functions to interact with function state and I cannot make it work using the simplest possible example. My function is the following: ```package my.functions; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; public class CountMessages implements Function<String, Void> { @Override public Void process(String s, Context context) throws Exception { try { context.incrCounter("test_counter", 1L); } catch (Exception e) { e.printStackTrace(); } return null; } }``` I’m running pulsar with the folllowing docker-compose configuration: ```version: "3.7" services: pulsar: image: apachepulsar/pulsar-all:2.5.0 environment: - PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" command: > /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" volumes: - "/pulsar/data" - "/pulsar/conf" - "./functions/out/artifacts:/functions/artifacts" ports: - "6650:6650" - "8080:8080"``` I’m running my function like this: ```docker-compose exec pulsar bin/pulsar-admin functions localrun --jar /functions/artifacts/functions.jar --classname my.functions.CountMessages --inputs tasks``` And when I produce a message on the topic tasks, I get the following output: ```java.lang.IllegalStateException: State is not enabled. at com.google.common.base.Preconditions.checkState(Preconditions.java:507) at org.apache.pulsar.functions.instance.ContextImpl.ensureStateEnabled(ContextImpl.java:262) at org.apache.pulsar.functions.instance.ContextImpl.incrCounter(ContextImpl.java:273) at my.functions.CountMessages.process(CountMessages.java:10) at my.functions.CountMessages.process(CountMessages.java:6) at org.apache.pulsar.functions.instance.JavaInstance.handleMessage(JavaInstance.java:63) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:269) at java.lang.Thread.run(Thread.java:748)``` I don’t see anywhere in the documentation saying state is not enabled by default or how to enable/disable it. Can anyone point me to the right place with instructions to enable function state? (sorry for the long message) ---- 2020-03-10 18:45:33 UTC - David Kjerrumgaard: You can create them before-hand using the CLI rather than relying on the auto-create feature inside the code. <https://pulsar.apache.org/docs/en/pulsar-admin/#create-3> ---- 2020-03-10 18:46:21 UTC - David Kjerrumgaard: There is some setup inside the broker and ZK to establish a topic, but it is a one-time cost. Do you plan on creating topics frequently? ---- 2020-03-10 18:50:10 UTC - David Kjerrumgaard: @Pierre-Yves Lebecq Can you try submitting the function using the `create` sub-command rather than `localrun`, which runs the function on your local machine? <https://pulsar.apache.org/docs/en/pulsar-admin/#create-1> ---- 2020-03-10 19:04:46 UTC - Sijie Guo: @Sijie Guo set the channel topic: - Pulsar 2.5.0 released! <http://pulsar.apache.org/release-notes/#250-mdash-2019-12-06-a-id250a> - Pulsar Summit SF 2020 (rescheduled to August/September): <https://pulsar-summit.org/> ---- 2020-03-10 19:12:21 UTC - Bobby: yeah, and they're able to accept connections locally ---- 2020-03-10 19:14:41 UTC - Bobby: what types of configurations on the bookkeepers are necessary for this to work though? ---- 2020-03-10 19:19:29 UTC - Ian: Does anyone know where I can find out more about per-subscriber throttling? (mentioned at the end of <https://github.com/apache/pulsar/wiki/PIP-3:-Message-dispatch-throttling>) ---- 2020-03-10 19:31:47 UTC - Evan Furman: @Evan Furman has joined the channel ---- 2020-03-10 19:37:04 UTC - Ian: I was thinking of the situation where there is a long backlog on a topic (due to a retention policy) and a new subscription is added. It would be desirable for this subscription to be rate limited to not affect the consumption rates of the other subscriptions. ---- 2020-03-10 19:57:23 UTC - Sijie Guo: <http://pulsar.apache.org/docs/en/pulsar-admin/#set-subscription-dispatch-rate> ---- 2020-03-10 19:58:49 UTC - Ian: That would be a single rate for all subscriptions in a namespace, is it possible to have different rates for different subscriptions within the same namepace? ---- 2020-03-10 19:59:12 UTC - Bobby: does the client need to connect directly to one of the broker ips? Can it just go through a vip i created? ---- 2020-03-10 20:01:13 UTC - Sijie Guo: I don’t think we support different dispatch rate for different subscriptions yet. Feel free to raise a github issue for this feature. ---- 2020-03-10 20:01:15 UTC - Alexander Ursu: How might I be able to include metadata for each message into a sink connector, i.e. id of the message, timestamp, etc. ---- 2020-03-10 20:03:15 UTC - Ian: Okay, thank you. I think it may also work just as well to handle the rate limiting in the consumer application itself without a change to Pulsar. ---- 2020-03-10 20:05:30 UTC - Sijie Guo: yes you can tune receiver queue size at consumer side for that purpose. ---- 2020-03-10 20:10:43 UTC - Sijie Guo: If you are writing your own sink, you can get message id, sequence id, timestamps and properties from the record. It is up to the sink implementation to decide how to leverage those information. ---- 2020-03-10 20:12:21 UTC - Andy Papia: yeah for my use case I was hoping to create topics frequently with low delay ---- 2020-03-10 20:49:25 UTC - Andy Papia: I'm seeing that topic creation takes 1-2 seconds using auto creation when creating a producer through the Java client. Is there any way to make this faster? I was hoping to be able to create new topics frequently with low delay. ---- 2020-03-10 20:51:57 UTC - Devin G. Bost: @Alexander Ursu Sijie is right. That’s the correct way to do it. ---- 2020-03-10 21:02:17 UTC - Joe Francis: @Ian my understanding is that throttling only applied to backlogged consumers, and does not apply to caught up consumers, if throttling on message rate. ---- 2020-03-10 22:26:24 UTC - Evan Furman: Is there a way to disable batching entirely on the `pulsar-perf` tool? ---- 2020-03-10 22:28:26 UTC - Sijie Guo: specifying `--batch-max-messages` to be 1? ---- 2020-03-10 22:29:33 UTC - Evan Furman: interesting, that one is undocumented it looks like <https://pulsar.apache.org/docs/v2.0.1-incubating/reference/CliTools/> ---- 2020-03-10 22:30:53 UTC - Sijie Guo: are you using v2.0.1-incubating? ---- 2020-03-10 22:32:26 UTC - Sijie Guo: bin/pulsar-perf produce --help ---- 2020-03-10 22:39:48 UTC - Evan Furman: ah ok, so the producer batches by default but the consumer does not ---- 2020-03-10 22:42:26 UTC - Sijie Guo: are you create a partitioned topic? ---- 2020-03-10 22:48:03 UTC - David Kjerrumgaard: There is an async call for creating producers, which wraps the topic creation. <https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ProducerBuilder.html#createAsync--> ---- 2020-03-10 22:55:23 UTC - Andy Papia: No ---- 2020-03-10 22:57:15 UTC - Andy Papia: I am not positive though. I haven't configured a namespace so it is using whatever the defaults are. ---- 2020-03-10 23:48:00 UTC - Andy Papia: Yeah I actually want to produce records with low delay though, not just do it asynchronously. ---- 2020-03-10 23:48:37 UTC - Andy Papia: Might need to precreate topics to optimize for that. ---- 2020-03-11 00:40:38 UTC - Ken Huang: thanks ---- 2020-03-11 01:07:37 UTC - Sijie Guo: creating a topic is mainly a metadata operation. have you looked into your zookeeper write latency? ---- 2020-03-11 01:08:20 UTC - Andy Papia: yeah the issue could be there. I'm running ZK on some small t3.medium instances. ---- 2020-03-11 01:09:10 UTC - Sijie Guo: the underlying wire frames are batched. consumer receives the messages in batch and put the messages into a receive queue. the application calls `receive()` method to dequeue items from the receive queue. in 2.5.0, I think there is a batch receive method introduced. that you can call it to receive a message batch. ---- 2020-03-11 01:10:04 UTC - Sijie Guo: if you have prometheus + grafana setup, you can check the zookeeper dashboard to see if there is any high write latency. ---- 2020-03-11 01:10:15 UTC - Sijie Guo: It might also related to the disk you used for zookeeper as well. ---- 2020-03-11 01:10:35 UTC - Andy Papia: ahh good point ---- 2020-03-11 01:23:03 UTC - Andy Papia: I'm a little lost on how to create a topic within a namespace using the Java client. Creating topics looks completely separate from namespace in both the admin client and with a producer. How are they related? Specifically, how do I create a non-partitioned persistent topic with the persistent settings that I set on a namespace? ---- 2020-03-11 01:23:28 UTC - Andy Papia: Does the topic string have to contain the namespace? Is there a default namespace? ---- 2020-03-11 01:24:49 UTC - Andy Papia: I'm guessing that you just have to specify the namespace in the topic String... ---- 2020-03-11 01:27:57 UTC - Sijie Guo: just use the fully qualified name to create the topic: persistent://<tenant>/<namespace>/<topic> ---- 2020-03-11 01:28:09 UTC - Sijie Guo: the namespace settings will be applied to the topic you created. ---- 2020-03-11 01:28:34 UTC - Sijie Guo: public/default is the default namespace. ---- 2020-03-11 01:28:50 UTC - Andy Papia: yeah this is what I figured but it is weird that the API just takes a String. ---- 2020-03-11 01:29:07 UTC - Andy Papia: what's the default tenant? ---- 2020-03-11 01:29:20 UTC - Sijie Guo: default tenant is “public” ---- 2020-03-11 01:29:29 UTC - Andy Papia: oh got it ---- 2020-03-11 01:30:22 UTC - Andy Papia: so if I create a namespace called "audio" and apply my persistence settings to it, then I can create topics with String: `<persistent://public/audio/><topic>` and it will be in that namespace. ---- 2020-03-11 01:30:52 UTC - Andy Papia: thanks. I feel like this could be added to the javadoc or something so it is a little more intuitive. ---- 2020-03-11 01:31:52 UTC - Sijie Guo: <http://pulsar.apache.org/docs/en/concepts-messaging/#topics> ---- 2020-03-11 01:32:23 UTC - Andy Papia: ahh topic names are URLs... ---- 2020-03-11 01:43:39 UTC - Eric Simon: Andy - Yes it does ---- 2020-03-11 01:44:12 UTC - Eric Simon: ```client.namespaces().createNamespace(s"$tenant/$name")``` +1 : Andy Papia ---- 2020-03-11 02:24:16 UTC - Andy Papia: Can someone give me some context on these error/warnings? ```2020-03-11 02:20:58,309 [pulsar-client-io-1896-1] ERROR o.a.p.c.i.ClientCnx - [id: 0x3cdad0df, L:/100.96.24.2:39750 - R:my-pulsar-broker.default.svc.cluster.local/100.96.13.11:6650] Close connection because received internal-server error java.lang.IllegalStateException: Namespace bundle public/audio/0x40000000_0x80000000 is being unloaded 2020-03-11 02:20:58,317 [pulsar-client-io-1896-1] WARN .BinaryProtoLookupService - [<persistent://public/audio/2d544981-ef9b-4a24-bb6a-685bca20bcf9>] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/audio/0x40000000_0x80000000 is being unloaded 2020-03-11 02:20:58,321 [pulsar-client-io-1896-1] WARN a.p.c.i.ConnectionHandler - [<persistent://public/audio/2d544981-ef9b-4a24-bb6a-685bca20bcf9>] [my-pulsar-14-168] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: java.lang.IllegalStateException: Namespace bundle public/audio/0x40000000_0x80000000 is being unloaded``` ---- 2020-03-11 02:25:11 UTC - Andy Papia: I'm trying to publish to 900 topics in one namespace concurrently and seeing this error. What would cause the namespace bundle to be unloaded? ---- 2020-03-11 02:26:24 UTC - Andy Papia: is there a limit to the number of topics per broker or namespace? ---- 2020-03-11 08:31:46 UTC - Pierre-Yves Lebecq: @David Kjerrumgaard It works when using create. Thank you. I was running the localrun inside the container though, I expected it to work without any issue. Any suggestion on how to efficiently develop functions? I did not succeed making <https://pulsar.apache.org/docs/en/functions-debugging/#debug-with-localrun-mode> to work either, so using create and delete or update would work but it’s far from being practical :sweat: ---- 2020-03-11 09:06:28 UTC - xue: Does pulsar support etcd to replace zookeeper? ----
