2018-10-09 10:47:56 UTC - Nathanial Murphy: Is there a way of submitting a 
proposal for features, or do we just do the work in PRs and ask for review?
----
2018-10-09 11:25:35 UTC - Samuel Sun: <https://github.com/apache/pulsar/wiki> ?
----
2018-10-09 13:21:43 UTC - hj: @Sanjeev Kulkarni Sorry for being late. Thanks 
for help I will try
----
2018-10-09 13:54:57 UTC - Shalin: `-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g 
-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled 
-XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis 
-XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 
-XX:+DisableExplicitGC -XX:-ResizePLAB -Dio.netty.leakDetectionLevel=disabled 
-Dio.netty.recycler.maxCapacity.default=1000 
-Dio.netty.recycler.linkCapacity=1024`
----
2018-10-09 14:01:31 UTC - Shalin: The topics get messages of size ~ `6mb` 
published concurrently, max ~ 1000/sec, but intermittently. And the messages 
are consumed/ack'ed ~ 10/min.
----
2018-10-09 14:03:51 UTC - Shalin: The memory used by the broker increases as I 
publish more messages, I tried reducing the `managedLedgerCacheSizeMB` to `512` 
to see if that would help, but there still seems to be a steady increase in the 
memory used as the backlog increases.
----
2018-10-09 14:04:39 UTC - Shalin: @Sijie Guo Let me know if that makes sense, 
or if you need some more context on the problem.
----
2018-10-09 14:59:02 UTC - Igor Zubchenok: Hello! Can we change broker config 
from
```
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=3
managedLedgerDefaultAckQuorum=2
```
to
```managedLedgerDefaultEnsembleSize=2
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
```
and restart brokers one-by-one without data cleanup?
----
2018-10-09 15:33:40 UTC - Matteo Merli: yes, although you don’t need to restart 
brokers to change this setting. You can apply that at the namespace level and 
it will be automatically applied
----
2018-10-09 15:33:56 UTC - Matteo Merli: `pulsar-admin namespaces 
set-persistence ...`
----
2018-10-09 15:36:36 UTC - Sijie Guo: @Nathanial Murphy:

you can write a proposal in PIP (examples: 
<https://github.com/apache/pulsar/wiki/PIP-23%3A-Message-Tracing-By-Interceptors>).
 you can write in google doc or gist and send an email to 
<mailto:[email protected]|[email protected]>, committers will help 
you adding your proposal to pulsar wiki.
----
2018-10-09 18:29:04 UTC - William Fry: @Sijie Guo any help here would be 
_awesome_, thx!
----
2018-10-09 18:52:04 UTC - Matteo Merli: @William Fry the 
`managedLedgerCacheSizeMB` does indeed come from direct memory. Also the memory 
used for Netty IO comes from direct memory and it’s pooled. There are Netty 
tunables to reduce the amount of memory retained in the pools under different 
conditions.
In any case, we are thinking on ways to simplify the memory configs
----
2018-10-09 18:55:17 UTC - Sijie Guo: I think you can reduce 
managedLedgerCacheSizeMB to even smaller (given you have pretty small setup), 
like 128.
----
2018-10-09 18:56:28 UTC - Shalin: @Matteo Merli So if the mem used by the 
broker is approaching ~ `2GB` when the `managedLedgerCacheSizeMB` is set to 
`512mb`, is it safe to assume that netty is eating up the memory?
----
2018-10-09 18:58:27 UTC - Matteo Merli: Yes, there are per-thread caches (for 
perf reasons). You can reduce the mem usage by reducing the pools granularity. 
Eg. Add system properties:

`-Dio.netty.allocator.numDirectArenas=1 -Dio.netty.allocator.normalCacheSize=0 
-Dio.netty.allocator.maxOrder=9`
----
2018-10-09 19:05:53 UTC - Shalin: @Matteo Merli @Sijie Guo Since it has to do 
with netty, the reason we are experiencing this now is because of the number of 
concurrent publishers and not the size of the message itself?
----
2018-10-09 19:24:26 UTC - Matteo Merli: @Jean-Bernard van Zuylen I’d prefer to 
keep it “automatic” if possible, so inheriting the thread-daemon status from 
parent, so that we don’t have more config options (unless there are drawbacks 
with the approach).

I’m still not 100% clear on why this should block Tomcat runtime. Is that 
because there is no easy way to trigger `PulsarClient.close()`? Closing the 
client instance would be stopping all these threads.
----
2018-10-09 19:32:46 UTC - Nathanial Murphy: Just following up from before - 
would there be much interest/use to the pulsar user base in splitting up 
message batches inside of pulsar as an option you can configure? It wigs me out 
a little that the write rate determines how messages are acknowledged together, 
but I'm also happy to just use non-batched messages. 
----
2018-10-09 20:17:08 UTC - Alex Mault: Does anyone know the max length of a 
topic name is? (if any?)
----
2018-10-09 20:27:28 UTC - Shay Rybak: @Shay Rybak has joined the channel
----
2018-10-09 20:27:49 UTC - Matteo Merli: @Nathanial Murphy Rather that splitting 
batches in the broker, I’d rather add the individual message granularity on the 
acknowledgement path (with auto discarding of messages already acked within a 
batch)
----
2018-10-09 20:28:48 UTC - Matteo Merli: @Alex Mault there’s no max length on 
individual fields. Only limitation is on the whole command frames which are 
capped at 5MB
+1 : Alex Mault
----
2018-10-09 20:30:18 UTC - Shay Rybak: Hi, I’m trying to set up a kafka source 
using pulsar IO, I’ve created a source to read from a local kafka instance with 
the following config: ```configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar```
----
2018-10-09 20:30:21 UTC - Shay Rybak: `bin/pulsar-admin source create --tenant 
public --namespace default --name kafka --sourceConfigFile 
examples/kafka-source.yaml --source-type kafka --destinationTopicName kafka`
----
2018-10-09 20:30:47 UTC - Shay Rybak: I’m having a hard time troubleshooting 
why I don’t see any messages in the destination topic
----
2018-10-09 20:36:42 UTC - Matteo Merli: There should be a log file for the 
PulsarIO source, created on the broker
----
2018-10-09 20:37:49 UTC - Shay Rybak: I believe this is it: ```15:35:21.845 
[pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - New 
connection from /10.184.117.51:50812
15:35:21.850 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx 
- [/10.184.117.51:50812][<persistent://public/default/kafka>] Creating 
producer. producerId=0
15:35:21.850 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx 
- [/10.184.117.51:50812] Created new producer: 
Producer{topic=PersistentTopic{topic=<persistent://public/default/kafka>}, 
client=/10.184.117.51:50812, producerName=dal12-02-0-103, producerId=0}
15:35:22.124 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx 
- [PersistentTopic{topic=<persistent://public/default/kafka>}][dal12-02-0-103] 
Closing producer on cnx /10.184.117.51:50812
15:35:22.124 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx 
- [PersistentTopic{topic=<persistent://public/default/kafka>}][dal12-02-0-103] 
Closed producer on cnx /10.184.117.51:50812
15:35:22.137 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx 
- Closed connection from /10.184.117.51:50812``` I see that every 30 seconds or 
so, looks like it’s creating the producer to push messages but not pushing 
anything
----
2018-10-09 20:39:49 UTC - Shay Rybak: it has no details about pulling from 
kafka, if it failed or not
----
2018-10-09 20:40:53 UTC - Matteo Merli: let me try locally
----
2018-10-09 20:47:41 UTC - Matteo Merli: @Shay Rybak did you download the 
connectors?
----
2018-10-09 20:48:15 UTC - Matteo Merli: You can try also the `localrun` mode. 
eg: `bin/pulsar-admin source localrun  --name kafka --sourceConfigFile 
/tmp/kafka-source.yml --source-type kafka --destinationTopicName kafka`
----
2018-10-09 20:48:37 UTC - Sijie Guo: @Shay Rybak :

- where are you running? in standalone mode or in a cluster mode.
- you can use `bin/pulsar-admin functions getstatus  --tenant public 
--namespace default --name kafka` to query the running status
----
2018-10-09 20:48:39 UTC - Matteo Merli: That prints `Invalid source type 
'kafka' -- Available sources are: [twitter]` since the kafka connector is not 
“available”
----
2018-10-09 20:49:51 UTC - Shay Rybak: yes I did download, this is running in 
cluster mode
----
2018-10-09 20:50:19 UTC - Shay Rybak: ```$ bin/pulsar-admin functions getstatus 
 --tenant public --namespace default --name kafka
{
  "functionStatusList": [
    {
      "numRestarts": "69",
      "instanceId": "0",
      "workerId": "c-dal12-02-fw-&lt;host&gt;-8080"
    }
  ]
}```
----
2018-10-09 20:50:58 UTC - Sijie Guo: @Shay Rybak: if it is in cluster mode, you 
need to point bootstrapServers to the kafka brokers address
----
2018-10-09 20:51:28 UTC - Shay Rybak: kafka is running in a cluster on the same 
nodes, so that broker address is fine for all nodes
----
2018-10-09 20:51:49 UTC - Sijie Guo: oh i see
----
2018-10-09 20:51:49 UTC - Matteo Merli: ```
13:50:34.861 [public/default/kafka-0] ERROR 
org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public-kafka] 
Uncaught exception in Java Instance
java.lang.NullPointerException: null
        at 
org.apache.pulsar.io.kafka.KafkaAbstractSource.open(KafkaAbstractSource.java:59)
 ~[pulsar-io-kafka-2.2.0-SNAPSHOT.nar-unpacked/:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:570)
 ~[java-instance.jar:2.2.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:162)
 ~[java-instance.jar:2.2.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:188)
 [java-instance.jar:2.2.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
```
----
2018-10-09 20:52:34 UTC - Matteo Merli: Some of the parameters missing from  
the yml file are causing the NPE
----
2018-10-09 20:52:54 UTC - Matteo Merli: (though I think that should be fixed in 
the source itself)
----
2018-10-09 20:53:02 UTC - Sijie Guo: oh i see
----
2018-10-09 20:53:04 UTC - Shay Rybak: ```bin/pulsar-admin source 
available-sources
kafka
Kafka source and sink connector
----------------------------------------
rabbitmq
RabbitMQ source connector
----------------------------------------
twitter
Ingest data from Twitter firehose
----------------------------------------```
----
2018-10-09 20:54:26 UTC - Shay Rybak: so is this a configuration issue on my 
end or is the source broken?
----
2018-10-09 20:54:31 UTC - Sijie Guo: one second
----
2018-10-09 20:55:04 UTC - Matteo Merli: @Shay Rybak try with:

```
configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
```

In the Yaml file
----
2018-10-09 20:56:28 UTC - Nathanial Murphy: Doesn't that mean only a single 
consumer would still be dealing with a single batch? It's a better problem to 
have, admittedly. 
----
2018-10-09 20:57:04 UTC - Sijie Guo: @Matteo Merli you are fast
----
2018-10-09 20:57:18 UTC - Shay Rybak: ```bin/pulsar-admin functions getstatus  
--tenant public --namespace default --name kafka
{
  "functionStatusList": [
    {
      "running": true,
      "instanceId": "0",
      "metrics": {
        "metrics": {
          "__total_processed__": {},
          "__total_successfully_processed__": {},
          "__total_system_exceptions__": {},
          "__total_user_exceptions__": {},
          "__total_serialization_exceptions__": {},
          "__avg_latency_ms__": {}
        }
      },
      "workerId": &lt;&gt;"
    }
  ]
}```
----
2018-10-09 20:57:32 UTC - Shay Rybak: looks better but still can’t fetch 
messages
----
2018-10-09 20:57:43 UTC - Matteo Merli: Yes. Batching is still a way to have 
higher throughput at lower cost. If you have high throughput is still better to 
handle the dispatching in batches rather than for every message
----
2018-10-09 21:00:51 UTC - Matteo Merli: Seeing exception:

```
Exception in thread "Kafka Source Thread" java.lang.ClassCastException: 
java.lang.String cannot be cast to [B
        at 
org.apache.pulsar.io.kafka.KafkaStringSource.extractValue(KafkaStringSource.java:31)
        at 
org.apache.pulsar.io.kafka.KafkaStringSource.extractValue(KafkaStringSource.java:28)
        at 
org.apache.pulsar.io.kafka.KafkaAbstractSource.lambda$start$0(KafkaAbstractSource.java:109)
        at java.lang.Thread.run(Thread.java:745)
```
----
2018-10-09 21:01:34 UTC - Sijie Guo: which version of this?
----
2018-10-09 21:01:51 UTC - Sijie Guo: I think this is in between some changes.
----
2018-10-09 21:02:11 UTC - Sijie Guo: latest master?
----
2018-10-09 21:02:25 UTC - Matteo Merli: yes, probably is just on my machine
----
2018-10-09 21:02:35 UTC - Shay Rybak: I’m using 2.1.1 binary
----
2018-10-09 21:04:32 UTC - Sijie Guo: one second
----
2018-10-09 21:05:21 UTC - Nathanial Murphy: I'm guessing there's no facility to 
both ack and publish new messages as part of the same "transaction" either? I 
imagine that would alleviate some of the pains of two consumers publishing the 
same response to the same batched message. 
----
2018-10-09 21:09:45 UTC - Matteo Merli: Not yet, though there are plans to work 
on transactions
----
2018-10-09 21:12:14 UTC - Nathanial Murphy: Sweet. So it sounds like it could 
be feasible. I guess I have a proposal to attempt. :stuck_out_tongue:
----
2018-10-09 21:12:42 UTC - Sijie Guo: @Matteo Merli: I think your change sounds 
like different (which might be related schema changes in master).
----
2018-10-09 21:14:05 UTC - Matteo Merli: Definitely feasible, just a matter of 
time :slightly_smiling_face:
----
2018-10-09 21:14:08 UTC - Sijie Guo: @Shay Rybak: on your pulsar cluster, if 
you go the node that ‘workerId’ points to, you can see the connector logs under 
${PULSAR_HOME}/logs/public/default/kafka
----
2018-10-09 21:16:25 UTC - Shay Rybak: ```15:55:47.177 [Kafka Source Thread] 
INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Successfully joined group pulsar with generation 1
15:55:47.178 [Kafka Source Thread] INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly 
assigned partitions [pulsar-0] for group pulsar```
----
2018-10-09 21:16:39 UTC - Shay Rybak: these are the last messages in that log, 
no errors that I can see
----
2018-10-09 21:18:04 UTC - Sijie Guo: are you keeping publishing data to the 
kafka topic?
----
2018-10-09 21:21:08 UTC - Shay Rybak: yes, I publish every few minutes to make 
sure
----
2018-10-09 21:21:30 UTC - Shay Rybak: I deleted and recreated the source, 
cleaning the log in between
----
2018-10-09 21:21:45 UTC - Shay Rybak: ```16:19:50,810 INFO 
[public/default/kafka] [instance-0] JavaInstanceRunnable - Starting Java 
Instance kafka
16:19:50,874 INFO [public/default/kafka] [instance-0] JavaInstanceRunnable - 
Initialize function class loader for function kafka at function cache manager```
----
2018-10-09 21:21:56 UTC - Shay Rybak: that is all that is in the log
----
2018-10-09 21:22:26 UTC - Shay Rybak: 
`logs/functions/public/default/kafka/kafka-0.log`
----
2018-10-09 21:22:31 UTC - Shay Rybak: that is the location of the log file
----
2018-10-09 21:23:26 UTC - Sijie Guo: okay give me a few minutes
----
2018-10-09 21:40:51 UTC - Sijie Guo: configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
  valueDeserializationClass: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
----
2018-10-09 21:40:57 UTC - Sijie Guo: ```
configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
  valueDeserializationClass: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
```
----
2018-10-09 21:41:09 UTC - Sijie Guo: @Shay Rybak can you try this one?
----
2018-10-09 21:41:29 UTC - Sijie Guo: set the valueDeserializationClass for 
kafka connector to ByteArrayDeserializer
----
2018-10-09 21:41:51 UTC - Shay Rybak: trying
----
2018-10-09 21:42:23 UTC - Shay Rybak: works
----
2018-10-09 21:42:29 UTC - Sijie Guo: great
----
2018-10-09 21:42:40 UTC - Sijie Guo: although I think the kafka source connetor 
needs to be improved.
----
2018-10-09 21:43:11 UTC - Sijie Guo: I will handle those improvements 
:slightly_smiling_face:
----
2018-10-09 21:43:35 UTC - Shay Rybak: Thank you for your assistance
----
2018-10-09 21:43:37 UTC - Sijie Guo: I will also improve the documentation
----
2018-10-09 21:43:49 UTC - Shay Rybak: maybe paste that config as an example
----
2018-10-09 21:43:53 UTC - Sijie Guo: yes
----
2018-10-09 21:44:07 UTC - Shay Rybak: feel free to use this as well 
`bin/pulsar-admin source create --tenant public --namespace default --name 
kafka --sourceConfigFile examples/kafka-source.yaml --source-type kafka 
--destinationTopicName kafka`
----
2018-10-09 21:44:11 UTC - Sijie Guo: I will add examples configs
----
2018-10-09 21:44:24 UTC - Sijie Guo: yes :+1:
----
2018-10-09 21:44:33 UTC - Sijie Guo: I will use your command 
:slightly_smiling_face:
----
2018-10-09 21:45:49 UTC - Shay Rybak: :pray:
----
2018-10-10 02:40:30 UTC - jerser: @jerser has joined the channel
----
2018-10-10 03:45:38 UTC - alex xu: @alex xu has joined the channel
----
2018-10-10 04:29:29 UTC - science09: @science09 has joined the channel
----
2018-10-10 07:13:52 UTC - Nicolas Ha: Still haven’t got to using Daemonset in 
the helm kubernetes - hopefully I get to play today :smile:
----

Reply via email to