2020-03-06 10:08:24 UTC - Vladimir Shchur: Hi! I'm trying to run standalone
functions worker (due to the fact the setup with functionsWorkerEnabled=true
doesn't support state for 2.5.0 helm chart), but I'm unable to make it work.
What I've achieved - worker is functioning normally, I can see successful
worker requests in broker logs, but `curl
<http://pulsar-proxy:8080/admin/v2/worker/cluster>` request returns "Function
worker service is not done initializing" error. Proxy is configured well, but
/admin/v2/worker requests are not routed to function worker according to
<https://github.com/apache/pulsar/blob/branch-2.5/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java#L263-L264|those
lines>, and broker is unaware of functions worker existence. How can I make it
happen?
----
2020-03-06 10:20:25 UTC - Sijie Guo: there is a pull request out for this fix.
----
2020-03-06 10:20:58 UTC - Sijie Guo: Currently the workload is to use `v2`
admin api.
----
2020-03-06 11:19:01 UTC - Vladimir Shchur: @Sijie Guo I'm afraid this is a
different problem, I do use v2 admin api, but still get "Function worker
service is not done initializing" error, which means broker doesn't know about
function worker and broker doesn't use proxy. Do you still think it is related?
Or broker shouldn't know about workers and `/admin/v2/worker` request should
also be redirected to worker node?
----
2020-03-06 12:07:29 UTC - Pierre-Yves Lebecq: I just did it and ran it but I
have the exact same error unfortunately. :disappointed:
----
2020-03-06 13:21:56 UTC - Vladimir Shchur: I've managed to hack the admin
commands to create function inside worker, but facing another issue - function
doesn't process input topic, although it's status is running and I don't see
any errors in function logs and no errors in function status. Topic stats
doesn't show any active consumer. Any ideas what to do next?
----
2020-03-06 14:28:40 UTC - Guilherme Perinazzo: I think the problem i'm seeing
with MessageTooBig when I try to send as fast as possible is because my
producer queue is getting full, and I didn't have it set to block when the
queue is full
----
2020-03-06 14:33:32 UTC - Guilherme Perinazzo: My C lib wrapper may be off by
one on the pulsar result mapping
----
2020-03-06 15:03:10 UTC - Stefan Lipinski: @Stefan Lipinski has joined the
channel
----
2020-03-06 15:47:11 UTC - Gilles Barbier: Hi, I can not find any published
roadmap for Pulsar. Is there one somewhere? Sorry if it’s a trivial question.
+1 : Martina Oefelein
----
2020-03-06 15:53:16 UTC - Bobby: I built it from scratch and pushed it to our
own registry
----
2020-03-06 17:00:47 UTC - Vladimir Shchur: @Chris Bartholomew Can you please
help with it? I'm running your function worker pointing to bookkeeper, is there
anything else I can do?
----
2020-03-06 17:03:42 UTC - Chris Bartholomew: @Vladimir Shchur do the function
logs show that it has connected to the state storage?
----
2020-03-06 17:03:47 UTC - Sijie Guo: Did you configure the function worker
service url at proxy?
----
2020-03-06 17:06:34 UTC - Sijie Guo: You can search the “2.6.0 release” GitHub
issue. That was used for tracking the upcoming features.
----
2020-03-06 17:06:57 UTC - Sijie Guo: Ah PIPs is another place for checking
about new things coming up.
----
2020-03-06 17:07:39 UTC - Sijie Guo: Are you batching messages together?
----
2020-03-06 17:08:02 UTC - Vladimir Shchur: ```12:35:49.294 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - JavaInstance Server
started, listening on 38909
12:35:49.295 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - Starting
runtimeSpawner
12:35:49.296 [main] INFO org.apache.pulsar.functions.runtime.RuntimeSpawner -
public/default/exclamation-0 RuntimeSpawner starting function
12:35:49.301 [main] INFO
org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer
starting function with instance config InstanceConfig(instanceId=0,
functionId=9e3c5869-2861-49d1-91a4-ce1730b3d466,
functionVersion=1bac032b-e26b-478f-a502-8549b3b9bbd3, functionDetails=tenant:
"public"
namespace: "default"
name: "exclamation"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "<persistent://public/default/exclamation-input>"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "<persistent://public/default/exclamation-output>"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
, maxBufferedTuples=1024, functionAuthenticationSpec=null, port=38909,
clusterName=pulsar)
12:35:49.303 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - Starting metrics
server on port 42721
12:35:49.312 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Starting Java
Instance exclamation :
Details = tenant: "public"
namespace: "default"
name: "exclamation"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "<persistent://public/default/exclamation-input>"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "<persistent://public/default/exclamation-output>"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
12:35:49.313 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Load JAR:
/tmp/pulsar_functions/public/default/exclamation/0/api-examples.jar
12:35:49.323 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Initialize function
class loader for function exclamation at function cache manager
12:44:11.235 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - JavaInstance Server
started, listening on 37259
12:44:11.239 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - Starting
runtimeSpawner
12:44:11.240 [main] INFO org.apache.pulsar.functions.runtime.RuntimeSpawner -
public/default/exclamation-0 RuntimeSpawner starting function
12:44:11.247 [main] INFO
org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer
starting function with instance config InstanceConfig(instanceId=0,
functionId=34ee4317-5698-4178-b176-d88df5c03342,
functionVersion=fcd8d52a-3982-4a40-9712-90ba84eecc53, functionDetails=tenant:
"public"
namespace: "default"
name: "exclamation"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "<persistent://public/default/exclamation-input>"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "<persistent://public/default/exclamation-output>"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
, maxBufferedTuples=1024, functionAuthenticationSpec=null, port=37259,
clusterName=pulsar)
12:44:11.256 [main] INFO
org.apache.pulsar.functions.runtime.JavaInstanceStarter - Starting metrics
server on port 43645
12:44:11.268 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Starting Java
Instance exclamation :
Details = tenant: "public"
namespace: "default"
name: "exclamation"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "<persistent://public/default/exclamation-input>"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "<persistent://public/default/exclamation-output>"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
12:44:11.269 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Load JAR:
/tmp/pulsar_functions/public/default/exclamation/0/api-examples.jar
12:44:11.282 [public/default/exclamation-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Initialize function
class loader for function exclamation at function cache manager```
----
2020-03-06 17:08:03 UTC - Gilles Barbier: :+1: thx
----
2020-03-06 17:08:33 UTC - Vladimir Shchur: This is all I have in function logs,
and function itself is exclamation function which doesn't even use state
----
2020-03-06 17:12:28 UTC - Vladimir Shchur: Yes, but it doesn't help. The PR was
updated for this case
<https://github.com/apache/pulsar/pull/6486>
----
2020-03-06 17:22:49 UTC - Sijie Guo: I see
----
2020-03-06 17:24:17 UTC - Sijie Guo: 1) did you function worker points to
broker? What does your function worker config look like?
2) what runtime do you run?
----
2020-03-06 17:27:46 UTC - Chris Bartholomew: @Vladimir Shchur What Pulsar
version are you running? I will set it up in my environment and see if it works.
----
2020-03-06 17:30:11 UTC - Vladimir Shchur: 1. Yes, it does, it works correctly,
will give some screenshots below, the config is
```12:43:55.257 [main] INFO org.apache.pulsar.functions.worker.WorkerService -
Worker Configs: {
"workerId" : "pulsar-function-0",
"workerHostname" : "pulsar-function-0.pulsar-function",
"workerPort" : 6750,
"workerPortTls" : null,
"jvmGCMetricsLoggerClassName" : null,
"numHttpServerThreads" : 8,
"configurationStoreServers" : "pulsar-zookeeper-0.pulsar-zookeeper",
"zooKeeperSessionTimeoutMillis" : 30000,
"zooKeeperOperationTimeoutSeconds" : 30,
"connectorsDirectory" : "./connectors",
"functionMetadataTopicName" : "metadata",
"functionWebServiceUrl" : null,
"pulsarServiceUrl" : "<pulsar://pulsar-broker:6650>",
"pulsarWebServiceUrl" : "<http://pulsar-broker:8080>",
"clusterCoordinationTopicName" : "coordinate",
"pulsarFunctionsNamespace" : "public/functions",
"pulsarFunctionsCluster" : "pulsar",
"numFunctionPackageReplicas" : 1,
"downloadDirectory" : "/tmp/pulsar_functions",
"stateStorageServiceUrl" : "<bk://pulsar-bookkeeper:4181>",
"functionAssignmentTopicName" : "assignments",
"schedulerClassName" :
"org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler",
"failureCheckFreqMs" : 30000,
"rescheduleTimeoutMs" : 60000,
"initialBrokerReconnectMaxRetries" : 60,
"assignmentWriteMaxRetries" : 60,
"instanceLivenessCheckFreqMs" : 30000,
"clientAuthenticationPlugin" : null,
"clientAuthenticationParameters" : null,
"bookkeeperClientAuthenticationPlugin" : null,
"bookkeeperClientAuthenticationParametersName" : null,
"bookkeeperClientAuthenticationParameters" : null,
"topicCompactionFrequencySec" : 1800,
"tlsEnabled" : false,
"tlsCertificateFilePath" : null,
"tlsKeyFilePath" : null,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsRequireTrustedClientCertOnConnect" : false,
"useTls" : false,
"tlsHostnameVerificationEnable" : false,
"tlsCertRefreshCheckDurationSec" : 300,
"authenticationEnabled" : false,
"authenticationProviders" : [ ],
"authorizationEnabled" : false,
"authorizationProvider" :
"org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider",
"superUserRoles" : [ ],
"properties" : { },
"functionRuntimeFactoryClassName" : null,
"functionRuntimeFactoryConfigs" : null,
"secretsProviderConfiguratorClassName" : null,
"secretsProviderConfiguratorConfig" : null,
"functionInstanceMinResources" : null,
"functionAuthProviderClassName" : null,
"runtimeCustomizerClassName" : null,
"runtimeCustomizerConfig" : { },
"threadContainerFactory" : null,
"processContainerFactory" : {
"javaInstanceJarLocation" : null,
"pythonInstanceLocation" : null,
"logDirectory" : null,
"extraFunctionDependenciesDir" : null
},
"kubernetesContainerFactory" : null,
"functionMetadataTopic" : "<persistent://public/functions/metadata>",
"clusterCoordinationTopic" : "<persistent://public/functions/coordinate>",
"functionAssignmentTopic" : "<persistent://public/functions/assignments>",
"tlsTrustChainBytes" : null,
"workerWebAddress" : "<http://pulsar-function-0.pulsar-function:6750>"
}```
2) Runtime is _process_
----
2020-03-06 17:31:26 UTC - Vladimir Shchur: All worker logs
<https://pastebin.com/5Q8UYSfu>
----
2020-03-06 17:33:05 UTC - Vladimir Shchur: @Chris Bartholomew it is 2.5.0 from
your helm chart with all native pulsar images
+1 : Chris Bartholomew
----
2020-03-06 17:37:44 UTC - Sijie Guo: What is the output when you run “topic
stats” over your input topic?
----
2020-03-06 17:38:17 UTC - Sijie Guo: What is the output when you run “topic
stats” over the output topic?
----
2020-03-06 17:38:36 UTC - Sijie Guo: Are you using the thread runtime or the
process runtime?
----
2020-03-06 17:39:33 UTC - Sijie Guo: Oh nvm I see the settings in the function
worker config you attached.
----
2020-03-06 17:40:12 UTC - Guilherme Perinazzo: Tried with both batching and
without, no difference. Increasing the queue size for the producer stops the
error.
It happens when I try producing about 1 million messages in a few seconds, so
it makes sense that the queue is the issue, just need to figure out why i'm not
seeing the producer queue full error
----
2020-03-06 17:41:21 UTC - Vladimir Shchur: Right now stats are `Topic not
found` since it was deleted, let me send some messages there again
----
2020-03-06 17:42:48 UTC - Vladimir Shchur:
```root@pulsar-broker-f856dc4fb-g5l2b:/pulsar# bin/pulsar-admin topics stats
<persistent://public/default/exclamation-input>
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"averageMsgSize" : 0.0,
"storageSize" : 61,
"backlogSize" : 0,
"publishers" : [ ],
"subscriptions" : { },
"replication" : { },
"deduplicationStatus" : "Disabled",
"bytesInCounter" : 61,
"msgInCounter" : 1
}```
Now it looks like this
----
2020-03-06 17:43:31 UTC - Vladimir Shchur: I send message via
```bin/pulsar-client produce <persistent://public/default/exclamation-input> \
--num-produce 1 \
--messages "Hello world"```
----
2020-03-06 17:46:44 UTC - Chris Bartholomew: @Vladimir Shchur after you updated
the setting to enable stream storage (`PULSAR_PREFIX_extraServerComponents:
org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent`), did you
restart BookKeeper?
----
2020-03-06 17:47:14 UTC - Vladimir Shchur: Yes, I recreated the whole cluster
many times
----
2020-03-06 17:49:05 UTC - Chris Bartholomew: Can you try a simple Python
function to see if that will run?
----
2020-03-06 17:49:33 UTC - Chris Bartholomew: I just want to know if it goes
into the "Running" state
----
2020-03-06 17:49:35 UTC - Vladimir Shchur:
```root@pulsar-broker-f856dc4fb-g5l2b:/pulsar# bin/pulsar-admin topics stats
<persistent://public/default/exclamation-output>
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"averageMsgSize" : 0.0,
"storageSize" : 0,
"backlogSize" : 0,
"publishers" : [ ],
"subscriptions" : {
"my-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"consumers" : [ ],
"isReplicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"bytesInCounter" : 0,
"msgInCounter" : 0
}```
This is for output topic
----
2020-03-06 17:50:12 UTC - Vladimir Shchur: I only run Java functions (both
exclamation and word-count) and they both are in Running state
----
2020-03-06 17:52:56 UTC - Chris Bartholomew: Python functions tend to fail to
run when they can't contact the state storage, where Java functions keep
running. I just want to see if any function on the function worker can reach
the state storage.
----
2020-03-06 17:53:47 UTC - Chris Bartholomew: In my current setup, my Java
functions are running, but none of the Python ones are because the state
storage is down.
----
2020-03-06 17:54:57 UTC - Vladimir Shchur: Ok, let me try
----
2020-03-06 17:55:11 UTC - Chris Bartholomew: Thanks
----
2020-03-06 17:59:27 UTC - Vladimir Shchur: You were right - it fails with
BKGrpcCallError
----
2020-03-06 17:59:45 UTC - Vladimir Shchur: grpc_status_code =
StatusCode.UNAVAILABLE, bk_status_code = None : failed to connect to all
addresses
----
2020-03-06 18:03:32 UTC - Chris Bartholomew: OK. Then we should confirm the
stateStorageServiceUrl setting in the function worker. You can see this from
the logs.
----
2020-03-06 18:06:26 UTC - Vladimir Shchur: <https://pastebin.com/raw/5Q8UYSfu>
- shows up as `"stateStorageServiceUrl" : "<bk://pulsar-bookkeeper:4181>"`
----
2020-03-06 18:07:28 UTC - Martina Oefelein: Pulsar newbie here.
----
2020-03-06 18:07:41 UTC - Chris Bartholomew: OK, then we need to look at the
BookKeeper logs to see if stream storage component is running
----
2020-03-06 18:10:14 UTC - Vladimir Shchur: Hmm, I see the logs show 3181 port
----
2020-03-06 18:10:50 UTC - Martina Oefelein: Trying to get pulsar manager
running.
I started pulsar via Docker:
```docker run -it -p 6650:6650 -p 8080:8080 --mount
source=pulsardata,target=/pulsar/data --mount
source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar bin/pulsar
standalone```
And Pulsar manager:
```docker run -it -p 9527:9527 -e REDIRECT_HOST=<http://192.168.0.104> -e
REDIRECT_PORT=9527 -e DRIVER_CLASS_NAME=org.postgresql.Driver -e
URL='jdbc:<postgresql://127.0.0.1:5432/pulsar_manager>' -e USERNAME=pulsar -e
PASSWORD=pulsar -e LOG_LEVEL=DEBUG -v $PWD:/data
apachepulsar/pulsar-manager:v0.1.0 /bin/sh```
After login, I get a screen with a "+ Environment" button and an empty table.
I click the "+ Environment" button, it ask for an Environment name and Service
URL. What do I have to enter here?
I tried different things for the service URL (localhost, localhost:8080,
localhost:6650), but I always get:
> *error*
> This environment is error. Please check it
(sic!)
(I assume the Environvent name doesn't matter as long as it is not empty, but
just to be safe, I also tried "pulsar" and "default". Same result.)
----
2020-03-06 18:11:21 UTC - Vladimir Shchur: ```[conf/pulsar_env.sh] Applying
config PULSAR_EXTRA_OPTS = -Dpulsar.log.root.level=info
[conf/pulsar_env.sh] Applying config PULSAR_MEM = "-Xms128m -Xmx256m
-XX:MaxDirectMemorySize=128m -Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024 -XX:+UseG1GC -XX:MaxGCPauseMillis=10
-XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions
-XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32
-XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC
-XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime
-XX:+PrintHeapAtGC -verbosegc -XX:G1LogLevel=finest"
[conf/pulsar_env.sh] Adding config extraServerComponents =
org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
[conf/bkenv.sh] Adding config extraServerComponents =
org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
[conf/bookkeeper.conf] Applying config autoRecoveryDaemonEnabled = false
[conf/bookkeeper.conf] Applying config statsProviderClass =
org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
[conf/bookkeeper.conf] Applying config useHostNameAsBookieID = true
[conf/bookkeeper.conf] Applying config zkServers =
pulsar-zookeeper-0.pulsar-zookeeper
[conf/bookkeeper.conf] Adding config extraServerComponents =
org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
[conf/pulsar_env.sh] Applying config PULSAR_EXTRA_OPTS =
-Dpulsar.log.root.level=info
[conf/pulsar_env.sh] Applying config PULSAR_MEM = "-Xms128m -Xmx256m
-XX:MaxDirectMemorySize=128m -Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024 -XX:+UseG1GC -XX:MaxGCPauseMillis=10
-XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions
-XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32
-XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC
-XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime
-XX:+PrintHeapAtGC -verbosegc -XX:G1LogLevel=finest"
[conf/pulsar_env.sh] Applying config PULSAR_EXTRA_OPTS =
-Dpulsar.log.root.level=info
[conf/pulsar_env.sh] Applying config PULSAR_MEM = "-Xms128m -Xmx256m
-XX:MaxDirectMemorySize=128m -Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024 -XX:+UseG1GC -XX:MaxGCPauseMillis=10
-XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions
-XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32
-XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC
-XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime
-XX:+PrintHeapAtGC -verbosegc -XX:G1LogLevel=finest"
0.885: Total time for which application threads were stopped: 0.0002989
seconds, Stopping threads took: 0.0000779 seconds
1.127: Total time for which application threads were stopped: 0.0003517
seconds, Stopping threads took: 0.0000363 seconds
1.171: Total time for which application threads were stopped: 0.0002409
seconds, Stopping threads took: 0.0000440 seconds
1.528: Total time for which application threads were stopped: 0.0002747
seconds, Stopping threads took: 0.0000487 seconds
2.554: Total time for which application threads were stopped: 0.0091567
seconds, Stopping threads took: 0.0089983 seconds
3.028: Total time for which application threads were stopped: 0.0006205
seconds, Stopping threads took: 0.0000164 seconds
3.906: Total time for which application threads were stopped: 0.0004473
seconds, Stopping threads took: 0.0000232 seconds
4.906: Total time for which application threads were stopped: 0.0003322
seconds, Stopping threads took: 0.0000277 seconds
5.254: Total time for which application threads were stopped: 0.0003384
seconds, Stopping threads took: 0.0000165 seconds
5.258: Total time for which application threads were stopped: 0.0004984
seconds, Stopping threads took: 0.0000219 seconds
10:22:09.514 [main] INFO org.apache.bookkeeper.server.Main - Using
configuration file /pulsar/conf/bookkeeper.conf
5.428: Total time for which application threads were stopped: 0.0003608
seconds, Stopping threads took: 0.0000143 seconds
10:22:09.545 [main] INFO org.apache.bookkeeper.server.Main - Hello, I'm your
bookie, listening on port 3181. Metadata service uri is
<zk+null://pulsar-zookeeper-0.pulsar-zookeeper/ledgers>. Journals are in
[data/bookkeeper/journal]. Ledgers are stored in data/bookkeeper/ledgers.
10:22:09.588 [main] INFO org.apache.bookkeeper.server.Main - Load lifecycle
component : org.apache.bookkeeper.server.service.StatsProviderService
5.497: Total time for which application threads were stopped: 0.0006078
seconds, Stopping threads took: 0.0000168 seconds
10:22:09.623 [main] INFO org.apache.bookkeeper.proto.BookieServer - {
"readBufferSizeBytes" : "4096",
"statsProviderClass" :
"org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider",
"majorCompactionThreshold" : "0.5",
"numJournalCallbackThreads" : "8",
"httpServerPort" : "8000",
"lostBookieRecoveryDelay" : "0",
"journalAlignmentSize" : "4096",
"compactionRateByBytes" : "1000000",
"httpServerClass" : "org.apache.bookkeeper.http.vertx.VertxHttpServer",
"dbStorage_rocksDB_numFilesInLevel0" : "4",
"minUsableSizeForIndexFileCreation" : "1073741824",
"gcOverreplicatedLedgerWaitTime" : "86400000",
"journalMaxGroupWaitMSec" : "1",
"dbStorage_rocksDB_numLevels" : "-1",
"dbStorage_rocksDB_bloomFilterBitsPerKey" : "10",
"ledgerStorageClass" :
"org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage",
"auditorPeriodicBookieCheckInterval" : "86400",
"gcWaitTime" : "900000",
"compactionRate" : "1000",
"fileInfoFormatVersionToWrite" : "0",
"entryLogFilePreallocationEnabled" : "true",
"journalSyncData" : "true",
"zkServers" : "pulsar-zookeeper-0.pulsar-zookeeper",
"compactionRateByEntries" : "1000",
"dbStorage_rocksDB_maxSizeInLevel1MB" : "256",
"diskCheckInterval" : "10000",
"auditorPeriodicCheckInterval" : "604800",
"dbStorage_rocksDB_writeBufferSizeMB" : "64",
"autoRecoveryDaemonEnabled" : "false",
"maxPendingAddRequestsPerThread" : "10000",
"majorCompactionInterval" : "86400",
"httpServerEnabled" : "false",
"flushInterval" : "60000",
"journalFlushWhenQueueEmpty" : "false",
"minorCompactionInterval" : "3600",
"dbStorage_rocksDB_blockCacheSize" : "",
"isThrottleByBytes" : "false",
"numAddWorkerThreads" : "0",
"dbStorage_rocksDB_sstSizeInMB" : "64",
"journalDirectory" : "data/bookkeeper/journal",
"journalWriteBufferSizeKB" : "64",
"extraServerComponents" :
"org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent",
"diskUsageThreshold" : "0.95",
"openFileLimit" : "0",
"prometheusStatsHttpPort" : "8000",
"zkLedgersRootPath" : "/ledgers",
"journalMaxSizeMB" : "2048",
"journalAdaptiveGroupWrites" : "true",
"openLedgerRereplicationGracePeriod" : "30",
"ledgerDirectories" : "data/bookkeeper/ledgers",
"zkTimeout" : "30000",
"dbStorage_rocksDB_blockSize" : "65536",
"journalMaxBackups" : "5",
"maxPendingReadRequestsPerThread" : "2500",
"useHostNameAsBookieID" : "true",
"rereplicationEntryBatchSize" : "100",
"allowLoopback" : "false",
"readOnlyModeEnabled" : "true",
"journalRemoveFromPageCache" : "true",
"dbStorage_readAheadCacheMaxSizeMb" : "",
"zkEnableSecurity" : "false",
"numHighPriorityWorkerThreads" : "8",
"dbStorage_readAheadCacheBatchSize" : "1000",
"journalFormatVersionToWrite" : "5",
"writeBufferSizeBytes" : "65536",
"bookiePort" : "3181",
"dbStorage_writeCacheMaxSizeMb" : "",
"pageLimit" : "0",
"logSizeLimit" : "1073741824",
"advertisedAddress" : "",
"bookieDeathWatchInterval" : "1000",```
----
2020-03-06 18:11:21 UTC - Vladimir Shchur:
----
2020-03-06 18:11:32 UTC - Vladimir Shchur: Those are initial bookkeeper logs
----
2020-03-06 18:14:15 UTC - Chris Bartholomew: Do you see a log like this in BK:
```[conf/pulsar_env.sh] Adding config extraServerComponents =
org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent```
----
2020-03-06 18:15:07 UTC - Vladimir Shchur: Yes, such log exists. Let me
redeploy everything with proper port
----
2020-03-06 18:17:21 UTC - Chris Bartholomew: OK. With an incorrect BK setup, I
was seeing Java function running but not connecting to topics and Python
functions crashing. Once I fixed the BK setup, all functions are running and
Java functions are able to process messages.
----
2020-03-06 18:18:18 UTC - Sijie Guo: If the topic doesn’t exist before, that
means the topic is not created because java instance is not set up.
I would suggest disabling state first. then first to make sure the function
worker can work as normal before enabling state.
----
2020-03-06 18:21:08 UTC - Tobias Macey: For deploying Pulsar on "bare metal",
is it necessary to run both the BookKeeper `bookkeeper shell metaformat` _and_
the Pulsar `initialize-cluster-metadata` commands? Or is it sufficient to only
run the Pulsar command?
----
2020-03-06 18:21:21 UTC - Sijie Guo: what is the error you encountered?
----
2020-03-06 18:21:45 UTC - Sijie Guo: You can just run the pulsar command
----
2020-03-06 18:22:02 UTC - Tobias Macey: Thank you for that clarification
----
2020-03-06 18:30:20 UTC - Martina Oefelein: See updated post
(This "Enter to send" thing drives me crazy)
----
2020-03-06 18:34:36 UTC - Vladimir Shchur: Ok. I redeployed everything with
correct port, but it still not working, can you please check if you see anything
BK logs: <https://pastebin.com/raw/azG17nt6>
Worker logs: <https://pastebin.com/raw/CuLMiECJ>
----
2020-03-06 18:38:13 UTC - Liam Condon: @Liam Condon has joined the channel
----
2020-03-06 18:41:52 UTC - Vladimir Shchur: Indeed, it works with state disabled
----
2020-03-06 18:57:03 UTC - Vladimir Shchur: Function error says
grpc_status_code = StatusCode.UNAVAILABLE, bk_status_code = None : Connection
reset by peer
----
2020-03-06 19:42:31 UTC - Sijie Guo: @Vladimir Shchur :
```"stateStorageServiceUrl" : "<bk://pulsar-bookkeeper:3181>",```
----
2020-03-06 19:42:46 UTC - Sijie Guo: stateStorage is not listening in `3181`
----
2020-03-06 19:43:11 UTC - Sijie Guo: `3181` is the ledger storage.
----
2020-03-06 19:43:37 UTC - Sijie Guo: If you are configuring state storage, the
port is 4181.
----
2020-03-06 19:45:07 UTC - Sijie Guo: I think there are a couple things when
setting up a function worker. I typically suggest starting with things disabled
first and enable one by one.
since you are moving function worker to a separate process, it’s better to
start with state disabled, make sure function worker running as a separate
process correctly. then look into enabling state.
----
2020-03-06 19:54:35 UTC - Chris Bartholomew: Yes, it definitely needs to point
to port 4181. In my working environment, this is the setting in the BK logs:
``` "extraServerComponents" :
"org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent",```
And this is the setting in the function worker:
``` "stateStorageServiceUrl" : "<bk://useast2-aws-bookkeeper:4181>",```
Note that the host name needs to be the BookKeeper service name in k8s.
----
2020-03-06 20:27:57 UTC - Tobias Macey: Does it matter that I'm using a
separately deployed BookKeeper service from the upstream and not the bundled
version from the Pulsar distribution?
----
2020-03-06 20:33:26 UTC - Vladimir Shchur: well, it was 4181 before that, when
I thought that 3181 is needeed, because it is the port exposed by bookeeper
service.. Looks like 4181 port should be exposed as well
----
2020-03-06 20:47:18 UTC - Ali Ahmed: <https://github.com/eaba/SharpPulsar>
----
2020-03-06 21:07:07 UTC - Chris Bartholomew: @Vladimir Shchur sorry to let you
get so far down this path, but I am remembering why I built a standalone
statestorage component for Kubernetes. The stream/state storage function in
BookKeeper uses IP address as an identifier, not host name, which is
problematic in k8s where IP addresses change. I made a
<https://github.com/kafkaesque-io/bookkeeper/commit/108db3a99a5f90d559a8435368e5eee880947f3f|change>
to BookKeeper in my fork that enables it to use hostname. I think this is
required to get a reliable state storage in Kubernetes. This is what is in that
custom image I mentioned. You will need to run BookKeeper with this fix or one
like it to prevent the inevitable issue of the function worker losing
connectivity to the state service on BookKeeper when the IP address changes (ex
helm chart change).
----
2020-03-06 21:37:14 UTC - Vladimir Shchur: I see... Thank you anyway, will try
it on Monday. Actually all this work I'm doing is for conference at the end of
the March where I'll be talking about Pulsar and stateful functions in
particular so it will be embarrassing to be unable to launch them myself :)
----
2020-03-06 21:40:47 UTC - Chris Bartholomew: FWIW, I have run the standalone
state storage component quite a bit and it works OK. You just need to use that
custom image, which is based on 2.4.2, but that shouldn't matter much.
+1 : Vladimir Shchur
----
2020-03-06 22:04:30 UTC - Sijie Guo: It doesn’t matter
----
2020-03-06 22:05:25 UTC - Tobias Macey: Thank you. I ran the metaformat command
out of curiosity and saw that it was asking if I wanted to overwrite the
metadata which was informative :slightly_smiling_face:
----
2020-03-06 22:25:33 UTC - Sijie Guo: :slightly_smiling_face:
----
2020-03-07 01:22:00 UTC - Eugen: I ran a couple of simple (single
topic/partition) performance benchmarks, which show a huge throughput gap
between Kafka and Pulsar. These are the EC2 instances used:
```| | instance type | instance count | vCPU | ECU | memory |
storage | NIC |
|------------------ |-------------- |--------------- |----- |---- |-------
|------------------ |---------------- |
| ZooKeeper | t2.small | 3 | 1 | Var | 2 GiB |
EBS Only | Low to moderate |
| Broker+BookKeeper | i3en.2xlarge | 3 | 8 | 37 | 64 GiB | 2
x 2500 NVMe SSD | Up to 25Gb |
| Producer/Consumer | c5.2xlarge | 2 | 8 | 39 | 16 GiB |
EBS Only | Up to 10Gb |```
Chris Bartholomew's [Performance Comparison Between Apache Pulsar and Kafka:
Latency](<<https://kafkaesque.io/performance-comparison-between-apache-pulsar-and-kafka-latency/>>)
only looks at latency at a fixed throughput setting (100 byte messages, 50,000
msg/sec), and for that, Kafka showed lower latency than Pulsar in the most
simple case (single topic/partition).
My tests tried to find the maximum throughput, and these are my findings:
```| fsync, 2 replicas | msg/sec |
|------------------ |-------- |
| Kafka | 14,307 |
| Pulsar | 297,982 |```
- message size: 500 byte
- exactly-once producer (ordered, no loss)
- Kafka: `enable.idempotence=true`
- Pulsar: `producerBuilder.blockIfQueueFull(true)`
So in this test, Pulsar was 20 times faster. I have to say though that at this
throughput, the values for Pulsar occasionally went dont to an average of 120k
for one second and then back up, and above table contains only the averages.
During the test, there was one producer and one consumer, and the consumer was
able to keep up with the produced throughput.
I then retested Kafka with a couple of different setting, to see if there are
ways to improve throughput with more relaxed settings. These are the results:
```| Kafka | msg/sec |
|------------------ |-------- |
| fsync, 1 replicas | 111,265 |
| async, 1 replicas | 316,132 |
| async, 2 replicas | 72,785 |```
So even without synchronous fsync, Kafka is not able to get above 73k, unless
we also decrease the replica setting to 1, i.e. no redundancy. (I only tested
with both `min.insync.replicas` and `default.replication.factor` set to 1,
although I should have kept the latter at 2, which would reduce message loss to
a minimum in case of a broker failure. I was too busy testing, but then, this
would not be a setting we could tolerate in production anyway.)
Seeing my dismal Kafka results, I was baffled that Bartholomew's test with 50k
worked flawlessly, while I could only get 14k throughput. The most obvious
difference I found was the message size: I used 500 bytes (which is what we
have on average), while he used only 100 bytes. And to my surprise, Kafka's
fsync of 100 bytes is exactly 5 times faster than its 500 byte fsyncs! I would
have expected little difference for reasons of e.g. block size, but I'm no disk
i/o expert. So here are the results for this last test:
```| Kafka, 100 bytes | msg/sec |
|------------------ |-------- |
| fsync, 2 replicas | 63,180 |```
----
2020-03-07 01:25:43 UTC - Matteo Merli: @Eugen what are you using to generate
load?
----
2020-03-07 01:26:20 UTC - Eugen: A simple java tool that I wrote
----
2020-03-07 01:27:17 UTC - Eugen: I tried at different throughput settings, but
due to the blocking nature of both producers (given the settings I used), I
determined the maximum throughput but just sending as fast as I could,
single-threadedly
----
2020-03-07 01:37:04 UTC - Chris Bartholomew: Are you sure you are using the
NVMe SSD disks for both Kafka and Pulsar in your tests?
----
2020-03-07 01:37:08 UTC - Eugen: If anyone has any insight into why a 100 byte
fsync is 5 times faster than a 500 byte fsync on a NVMe SSD and XFS, I'd like
to hear. At 500 byte and 15k, we are talking about 7MB/sec only, so throughput
itself should not be an issue.
----
2020-03-07 01:38:27 UTC - Eugen: Kafka:
```log.dirs=/mnt/ssd0/kafka,/mnt/ssd1/kafka```
----
2020-03-07 01:39:35 UTC - Eugen: I used the openmessaging benchmark as a basis
for my tests
+1 : Chris Bartholomew
----
2020-03-07 01:40:24 UTC - Sijie Guo: @Eugen
Regarding 100 byte fsync is 5 times faster than 500 bytes, is batching playing
an important role here?
----
2020-03-07 01:40:28 UTC - Eugen: But I used a cheaper `i3en.2xlarge`, instead
of the `i3.4xlarge` that you used, which actually has 2 larger NVMe SSDs.
----
2020-03-07 01:43:54 UTC - Chris Bartholomew: Makes sense. As long as the setup
is the same for Kafka and Pulsar, then it is a fair comparison.
----
2020-03-07 01:44:11 UTC - Eugen: My guess as to why Pulsar is so much faster
was indeed that it does batching for fsyncs (correct me if I'm wrong), but
still, I would have thought that an fsync has a high constant cost, independent
of the number of bytes written, unless of course there we were comparing 100
bytes vs. 5kb or so
----
2020-03-07 01:44:21 UTC - Chris Bartholomew: Both setups using 2 NVMe SSDs.
----
2020-03-07 01:44:53 UTC - Eugen: Well, I'm not sure if it's fair, but I used
the same settings for Pulsar as you did:
```journalDirectories=/mnt/ssd0/pulsar/1,/mnt/ssd0/pulsar/2,/mnt/ssd0/pulsar/3,/mnt/ssd0/pulsar/4
ledgerDirectories=/mnt/ssd1/pulsar```
----
2020-03-07 01:45:19 UTC - Eugen: i.e. for reasons of throughput, there are 4
journal directories
----
2020-03-07 01:46:47 UTC - Eugen: of course Pulsar and Kafka are implemented
completely differently internally, but if I had had more time, I would have
wanted to try something like this, too:
```log.dirs=/mnt/ssd0/kafka/1,/mnt/ssd0/kafka/2,/mnt/ssd0/kafka/3,/mnt/ssd0/kafka/4,/mnt/ssd1/kafka```
----
2020-03-07 01:47:17 UTC - Chris Bartholomew: I can't say if that would help
Kafka's performance by adding more directories. I have never tried it.
----
2020-03-07 01:47:34 UTC - Eugen: plus, I doubt it, in a single topic/partition
test
----
2020-03-07 01:47:59 UTC - Eugen: where Kafka will certainly be writing to a
single directory
----
2020-03-07 01:48:07 UTC - Chris Bartholomew: For sure
----
2020-03-07 01:48:33 UTC - Sijie Guo: @Eugen: as you said the fsync cost is
relatively fixed when doing 100 bytes and 500 bytes. the fsync time determines
how fast a single thread can fsycing the data. If you can batch more data into
one single fsync call, you are able to achieve higher QPS, no?
Kafka does time based syncing. so I guess the theory applies there. since you
are able to batch more data into one fsync interval.
----
2020-03-07 01:51:49 UTC - Chris Bartholomew: One thing I remember when doing
the latency tests and doing some throughput runs, is that Kafka really didn't
do well with a single partition. It might be interesting to rerun with a few
partitions to see what difference that makes.
----
2020-03-07 01:52:45 UTC - Eugen: For the fsync tests, I used these settings:
```log.flush.interval.messages=1
<http://log.flush.interval.ms|log.flush.interval.ms>=0```
----
2020-03-07 01:53:19 UTC - Eugen: Otherwise Kafka's ack doesn't mean the data
has made it to the disk
----
2020-03-07 01:55:33 UTC - Sijie Guo: Okay. Does batch happening at the client
side?
----
2020-03-07 01:56:53 UTC - Eugen: According to your own report, which was on
latency, rather than throughput, the more partitions, the higher the Kafka
latency - in contrast to Pulsar:
```| Latency (msec) | Pulsar 1 | Kafka 1 | Pulsar 6 | Kafka 6 | Pulsar 16 |
Kafka 16 |
|--------------- |--------- |-------- |--------- |-------- |----------
|--------- |
| Average | 9.052 | 7.129 | 8.060 | 13.857 | 3.170 | 21.119
|
| 50th | 9.0 | 7.0 | 8.0 | 13.0 | 3.0 | 20.0
|
| 75th | 11.0 | 7.0 | 10.0 | 13.0 | 4.0 | 22.0
|
| 95th | 13.0 | 8.0 | 13.0 | 15.0 | 4.0 | 25.0
|
| 99th | 14.0 | 9.0 | 14.0 | 32.0 | 12.0 | 33.0
|
| 99.9th | 128.0 | 170.0 | 110.0 | 208.0 | 89.0 | 199.0
|
| 99.99th | 213.0 | 210.0 | 199.0 | 239.0 | 178.0 | 259.0
|
| Maximum | 267.0 | 243.0 | 253.0 | 287.0 | 255.0 | 334.0
|```
----
2020-03-07 01:58:11 UTC - Eugen: Yes.
```batch.size=16384
<http://linger.ms|linger.ms>=1```
----
2020-03-07 01:58:17 UTC - Eugen: But the bottleneck here was the fsync ack at
the server.
----
2020-03-07 01:59:43 UTC - Chris Bartholomew: Yes, but I did run some throughput
tests that I didn't publish. I recall needing to increase partitions on Kafka
to get to higher rates.
----
2020-03-07 02:00:36 UTC - Eugen: Would absolutely make sense. But Pulsar's
throughput will increase as well with more partitions, right?
----
2020-03-07 02:01:58 UTC - Chris Bartholomew: I still think Pulsar will win, but
the gap will be smaller. :slightly_smiling_face: The single partition case may
just be a Kafka performance blind spot. It was, after all, designed with
multiple partitions in mind.
----
2020-03-07 02:03:08 UTC - Eugen: And without synchronous fsync in mind... i.e.
weaker durability guarantees
----
2020-03-07 02:03:26 UTC - Chris Bartholomew: Exactly
----
2020-03-07 02:09:13 UTC - Sijie Guo: Well I don’t think Kafka waits for fsync
----
2020-03-07 02:10:44 UTC - Eugen: All in all, kudos to the Pulsar team for
creating this generic messaging system that can perform at this throughput with
synchronous fsyncs, even when guaranteeing gapless, ordered, exactly-once
message production in the face of broker failures (and even producer failures,
in case the producer sets the sequence id - impossible with Kafka). Speaks of a
thoughtful design!
man-surfing : Matteo Merli, Karthik Ramasamy, Joe Francis, Roman Popenov
clap : Karthik Ramasamy, Roman Popenov, Matt Hino
tada : Karthik Ramasamy, Roman Popenov, Matt Hino, Ali Ahmed
----
2020-03-07 02:11:28 UTC - Sijie Guo: So the fsync just means reducing the
portion of data will be lost in the event of power outage. And it slows down
the throughput
----
2020-03-07 02:11:59 UTC - Sijie Guo: So batching at the client side is actually
helping your Kafka client vet higher throughout
----
2020-03-07 02:13:15 UTC - Eugen: I (and I think @Chris Bartholomew, too) think
that above log flush settings guarantee synchronous fsyncing.
----
2020-03-07 02:18:53 UTC - Matteo Merli: > But Pulsar's throughput will
increase as well with more partitions, right?
Yes, with more partitions the load gets spread to more bokers
+1 : Eugen
----
2020-03-07 02:21:20 UTC - Matteo Merli: One thing to take into account though
is the effect of batching. eg: if you spread the same throughput to more
partitions, each partition will have a lower msg/s rate and thus the batching
will be reduced.
If you don't need key-routing when publishing, Pulsar client will try to
optimize the default round-robin distribution by aligning the max-batching
time, ir order to maximize the batching
----
2020-03-07 03:20:18 UTC - Eugen: If we are going for max throughput, using
multiple partitions would allow us to increase the msg/s rate (as that was the
reason for trying multiple partitions in the first place), so batching should
still be pretty effective.
----
2020-03-07 03:26:45 UTC - Eugen: @Chris Bartholomew
> I recall needing to increase partitions on Kafka to get to higher rates.
Looking just at a single machine and single SSD, I wonder if the number of
fsyncs/sec is greater when they are run in parallel. For HDDs, that what
certainly not be the case.
----
2020-03-07 03:33:29 UTC - Sijie Guo: I don’t think so. The fsync in Kafka is
decoupled from acknowledgement. Those settings just mean when to fsync. If you
used those settings from open messaging, those settings were put there just
because they are closest settings comparing to bookkeeper durability settings.
They don’t mean Kafka guarantee synchronize fsyncing.
----
2020-03-07 03:33:49 UTC - Sijie Guo: But I could be wrong.
----
2020-03-07 03:36:43 UTC - Eugen: Interesting. If that is indeed the case, then
even at 20 times worse throughput, Kafka wouldn't have the same strong
guarantees Pulsar gives, and when a broker fails, message loss of would not be
unlikely in Kafka.
----
2020-03-07 08:34:26 UTC - Jan-Pieter George: Could someone help us by pointing
to docs or something for the broker side redeliver policies? Couldn't find any
documentation on how to configure this, what are the settings/timeouts to
configure when a consumer never acknowledges a message? We've noticed they do
end up returning to the subscription for processing but not found a setting or
anything.
----