2019-03-20 10:53:42 UTC - Sébastien de Melo: Same message in standalone.
I have run the following commands (directly in the container):
bin/pulsar-perf simulation-client --port 5678 --service-url
<pulsar://localhost:6650>
bin/pulsar-perf simulation-controller --cluster standalone --client-port 5678
--clients localhost
> simulate localhost:2181
@Sijie Guo @Matteo Merli @David Kjerrumgaard Is this expected or am I doing
something wrong?
----
2019-03-20 11:17:58 UTC - Alexandre DUVAL: Really cool!!
----
2019-03-20 12:20:53 UTC - Shivji Kumar Jha: @Sijie Guo Hi, I am using avro
schema and produced some data
When I consume, I see this:
Message msg = consumer.receive();
System.out.println( msg.getSchemaVersion());
schema version always prints null.
----
2019-03-20 12:22:58 UTC - Shivji Kumar Jha: My expectation was this:
1. Producer will encode message version in the message
2. Consumer on receive will get the message version in (Message.java) object
3. Consumer will fetch message from schema registry by topic name and version
of schema.
4. consumer will decode using this schema.
----
2019-03-20 12:23:31 UTC - Shivji Kumar Jha: This is in latest client code
----
2019-03-20 12:26:57 UTC - Sijie Guo: @Shivji Kumar Jha ah, I think version is
only exposed when using GenericRecord schema. basically AUTO_CONSUME.
I don’t think it is exposed to POJO schema yet. But there is a working item
from @CongBo to expose the schema version for POJO schema.
The reason why I didn’t expose that in POJO schema, is currently the POJO
schema is only one fixed version. and we are using same schema for both (writer
and reader) schema. The version is only useful when
<https://github.com/apache/pulsar/issues/3742> is implemented.
----
2019-03-20 12:27:58 UTC - Sijie Guo: actually nvm
----
2019-03-20 12:28:13 UTC - Sijie Guo: I think I misread you question
----
2019-03-20 12:29:50 UTC - Sijie Guo: @Shivji Kumar Jha can you show me you
producer code?
----
2019-03-20 12:31:34 UTC - Shivji Kumar Jha: this.internalProducer =
PulsarClientBuilder.getPulsarClient()
.newProducer(getAvroSchema(typeClass))
.topic(topicName)
.producerName(producerConf.getProducerName())
.create();
getAvroSchema(typeClass) is like this:
String schemaJsonString =
clz.getField(“SCHEMA$“).get(null).toString();
return
AvroSchema.of(SchemaDefinition.builder().withJsonDef(schemaJsonString).build());
----
2019-03-20 12:31:59 UTC - Shivji Kumar Jha: @Sijie Guo
----
2019-03-20 12:34:37 UTC - Shivji Kumar Jha: Please tell me if something is
unclear.
----
2019-03-20 12:35:07 UTC - Sijie Guo: @Shivji Kumar Jha it is clear to me. both
producer and consumer are using the latest master code, right?
----
2019-03-20 12:35:19 UTC - Shivji Kumar Jha: yes
----
2019-03-20 12:35:48 UTC - Shivji Kumar Jha: the broker daemon is 2.3.0
----
2019-03-20 12:40:50 UTC - Sijie Guo: interesting.. your code seems to be fine.
give me a few minutes. let me try something quickly.
----
2019-03-20 12:56:37 UTC - Shivji Kumar Jha: AUTO_CONSUME , as far as i see,
always returns the latest schema.. for schema compatibility = none usecase that
wouldnt work.
----
2019-03-20 12:58:50 UTC - Shivji Kumar Jha:
message.msgMetadataBuilder.getSchemaVersion() is always “” (EMPTY STRING)
somehow. This i think is the issue..
----
2019-03-20 12:58:53 UTC - Shivji Kumar Jha: @Sijie Guo
----
2019-03-20 13:00:01 UTC - Shivji Kumar Jha:
this.msgMetadataBuilder.hasSchemaVersion() would then return false and thence
msg.getSchemaVersion is null
----
2019-03-20 13:01:16 UTC - Sijie Guo: I see. so there is a multi version
generic schema and schema provider implementation. @Penghui Li was adding that
piece of code. I guess there is one task missing is to wire this multi version
generic schema with AUTO_CONSUME.
----
2019-03-20 13:01:35 UTC - Sijie Guo: yes. was able to see the same thing here.
+1 : Yuvaraj Loganathan, Shivji Kumar Jha
----
2019-03-20 13:33:00 UTC - Daniel Ferreira Jorge: @Sijie Guo thanks, I will take
a look on the code!
----
2019-03-20 13:34:19 UTC - Daniel Ferreira Jorge: @David Kjerrumgaard Thanks for
the suggestion, I will take a look on that!
----
2019-03-20 13:37:09 UTC - Sijie Guo: @Shivji Kumar Jha I think the problem is
coming from batch message. the metadata is not propagated correctly in batched
message.
----
2019-03-20 13:37:21 UTC - Sijie Guo: if disabling batch, it is working
----
2019-03-20 13:37:34 UTC - Sijie Guo: sending out a patch for it
bananadance : Shivji Kumar Jha, Yuvaraj Loganathan
----
2019-03-20 13:45:29 UTC - Shivji Kumar Jha: how can we disable this batch
messaging in java client?
----
2019-03-20 13:46:25 UTC - David Kjerrumgaard: @Sébastien de Melo It looks looks
like there is something wrong with your metadata inside ZK.
`org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode
for /loadbalance/resource-quota/namespace`
----
2019-03-20 13:47:11 UTC - David Kjerrumgaard: @Sébastien de Melo Can you share
the command you are using?
----
2019-03-20 13:50:11 UTC - Sijie Guo: enableBatching(false)
----
2019-03-20 13:50:14 UTC - Sijie Guo: in producer side
----
2019-03-20 13:52:11 UTC - Sébastien de Melo: @David Kjerrumgaard In standalone
I have just run:
bin/pulsar-perf simulation-client --port 5678 --service-url
<pulsar://localhost:6650>
to start the client, then
bin/pulsar-perf simulation-controller --cluster standalone --client-port 5678
--clients localhost
and
simulate localhost:2181
in the controller.
Did I miss anything?
----
2019-03-20 13:54:35 UTC - David Kjerrumgaard: I've never run the simulation
tools on a standalone instance. So maybe there are some issues with that
environment. Let me try to reproduce the issue
----
2019-03-20 13:57:20 UTC - Sébastien de Melo: Sure, thank you.
I encountered the issue on EKS (deployed with helm) in the first place. I
wanted to try on a simpler environment to see if it happened too.
----
2019-03-20 13:58:42 UTC - Sijie Guo: @Shivji Kumar Jha: shared the change
<https://github.com/apache/pulsar/pull/3870>
I need to add unit tests for it.
----
2019-03-20 13:59:22 UTC - Shivji Kumar Jha: Thats quick! thank you
:slightly_smiling_face:
----
2019-03-20 13:59:44 UTC - David Kjerrumgaard: Thanks for that piece of
information
----
2019-03-20 13:59:59 UTC - David Kjerrumgaard: maybe there is a missing step in
our documentation.
----
2019-03-20 14:33:33 UTC - Shivji Kumar Jha: @Sijie Guo How can i convert
(byte[] schemaversionId) to a schema ID string? It seems the normal java way
doesnt work. Is there some encoding hidden there?
----
2019-03-20 15:24:14 UTC - Sijie Guo: it was supposed to be transparent to the
user. user can store the byte[] and use the byte[] version to retrieve the
schema. so that the schema registry can be pluggable.
but it turns out we exposed the implementation details at rest admin api. the
current implementation is a long number version.
you can deserialize it using ByteBuffer.getLong
----
2019-03-20 15:30:05 UTC - Daniel Ferreira Jorge: Hi again! We are trying to
move more of our production workload from kafka to pulsar and I have some more
questions:
1. Is it possible to specify a dead letter topic in the python client?
2. Is it possible to query the functions state from the REST API?
3. How does pulsar functions handle deduplication when fanning out messages?
(e.g. suppose that I'm consuming from single topic named `tables` and producing
to a single topic named `rows`, each message from `tables` will produce X >
1 messages to the `rows` topic. I can see this will be easy with `PIP-31` but
for now should I assume at least once in this particular case?)
4. Any rough ETA for PIP-31?
Thanks!!
----
2019-03-20 15:40:14 UTC - Sijie Guo: > 1. Is it possible to specify a dead
letter topic in the python client?
I don’t think it is available at python yet.
> Is it possible to query the functions state from the REST API?
yes that is available.
> How does pulsar functions handle deduplication when fanning out messages?
if your function is an idempotent function, ideally you are able to achieve
deduplication. but currently the exactly once is only applied on the result. if
you are producing multiple results, you have to do that using context.produce.
you can using msgid to generate sequence ids on the results to guarantee
deduplication.
PIP-31 is more about atomic write.
> Any rough ETA for PIP-31?
we are planning to start the work in one or two weeks or so. it might be take
1-2 months to get a first version (depending on the people can dedicate on
this). although it might take a while to get it battle tested.
----
2019-03-20 15:49:19 UTC - Shivji Kumar Jha: Oh you are amazing :blush: thank
you. Must be late for you... Sorry about that!
----
2019-03-20 15:57:10 UTC - Mark Marijnissen: `pulsar-broker` crashes with "Not
enough non-faulty bookies available" and tells me me in the logs "Failed to
find 1 bookies : excludeBookies [ nr1, nr 2 ]"
I check my 2 bookies, they're fine.
Why did pulsar-broker decide to exclude my bookies?
----
2019-03-20 16:11:42 UTC - Daniel Ferreira Jorge: Thanks again for the answers
Sijie!
----
2019-03-20 16:20:05 UTC - Daniel Ferreira Jorge: @Sijie Guo could you point to
me the state query rest api call docs? I could not find it...
----
2019-03-20 16:35:04 UTC - Sanjeev Kulkarni:
<https://github.com/apache/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java#L381>
----
2019-03-20 16:51:38 UTC - Daniel Ferreira Jorge: @Sanjeev Kulkarni Thanks, it
helped!
----
2019-03-20 17:27:16 UTC - Matteo Merli: It’s possible these are already being
used for a given ledger.
How many bookies you have in total?
What is the ensemble size you’re using?
----
2019-03-20 17:27:26 UTC - Laurent Chriqui: Hello, question regarding the python
pulsar functions. What happens when an exception is raised (example HTTP
timeout) through the message processing ? I have seen that the message is not
acked, but how do we retry to process that message without restarting the
function ?
----
2019-03-20 17:30:37 UTC - Jerry Peng: @Laurent Chriqui in your function config
you can set the field ```timeoutMs``` which specifies the max time a message
can be unacknowledged before it get resent by the broker
----
2019-03-20 17:31:05 UTC - Jerry Peng: example function config:
```
name: py-function
tenant: public
namespace: default
py: pulsar-functions/python-examples/test_function.py
className: test_function.TestFunction
inputSpecs:
<persistent://public/default/py-input>:
receiverQueueSize: 89
output: <persistent://public/default/py-output>
timeoutMs: 60000
parallelism: 3
cleanupSubscription: true
```
Int the above example, the ack timeout is set to 1 minute (60000 milliseconds)
----
2019-03-20 17:33:11 UTC - Jerry Peng: @Laurent Chriqui we are also in the
process of supporting functions to use negative acknowledgements that was
recently added in pulsar, so that when an error occurs in a function, the
function will send a neg-ack to the broker and the broker will be signaled to
redeliver the message
----
2019-03-20 17:40:50 UTC - Laurent Chriqui: The timeout is for when the function
doesn’t respond in time, right ? and it equals as the same as an exception
being raised by the function, but then the only way to re-process the message
is to restart the function, do I understand this correctly ?
----
2019-03-20 17:45:45 UTC - Jerry Peng: @Laurent Chriqui no. Specifying the
config ```timeoutMs``` will allow messages to be redelivered and thus
reprocessed if the message is not acked in the function within the specified
timeout period. A function won’t ack a message if there was an exception
thrown so by specifying ```timeoutMs```, messages will be redelivered when
there is an exception raised in functions. Set ```timeoutMs``` to be a value
perhaps 2x the normal processing time of a message for your function
----
2019-03-20 17:47:10 UTC - Laurent Chriqui: I don’t see the timeoutMs in the
docs <https://pulsar.apache.org/docs/en/pulsar-admin/#create-1>
----
2019-03-20 17:49:14 UTC - Jerry Peng: @Laurent Chriqui the documentation was
not updated but the option exists:
```
@Parameter(names = "--timeout-ms", description = "The message timeout in
milliseconds")
```
----
2019-03-20 17:49:33 UTC - Laurent Chriqui: ok thank you!
----
2019-03-20 17:50:17 UTC - Jerry Peng: @Laurent Chriqui feel free to submit a
quick PR to update the documentation :slightly_smiling_face:
----
2019-03-20 17:50:53 UTC - Laurent Chriqui: ok :slightly_smiling_face:
----
2019-03-20 18:09:08 UTC - vinay Parekar: Hi guys.
----
2019-03-20 18:09:45 UTC - vinay Parekar: i am trying to create a kafka source
for kerborized kafka cluster
----
2019-03-20 18:10:10 UTC - vinay Parekar: i am hitting wall where i am not able
to paas krb5.cof file to authenticate
----
2019-03-20 18:11:03 UTC - vinay Parekar: is there any way i can pass
-Djava.security.krb5.conf option
----
2019-03-20 18:11:20 UTC - vinay Parekar: so jvm will know where to look for
krb5.conf file
----
2019-03-20 18:11:25 UTC - vinay Parekar: any suggestions
----
2019-03-20 18:22:15 UTC - Thor Sigurjonsson: I'm working alongside @vinay
Parekar on this, more details: We see the `java -cp ...` process on the system
where the function running the kafka source is. We don't see the
`-Djava.security.krb5.conf` option on that process on the system. Tried putting
it in the conf/pulsar_env.sh file.
----
2019-03-20 18:23:22 UTC - Matteo Merli: are you running the functions in
process or thread mode?
----
2019-03-20 18:23:33 UTC - Matteo Merli: (or k8s..)
----
2019-03-20 18:23:38 UTC - Ryan Samo: Hey guys,
If you are using a partitioned topic vs a standard topic, will the subscription
modes still work the same? Like if I setup my consumer in failover mode, will
it work?
----
2019-03-20 18:23:59 UTC - Matteo Merli: Yes, everything works in about the same
way
----
2019-03-20 18:24:30 UTC - Matteo Merli: only difference is that with
partitions, the “active” consumer is elected on a per-partition base
----
2019-03-20 18:24:44 UTC - Matteo Merli: to spread the partitions across
available consumers
----
2019-03-20 18:26:04 UTC - Ryan Samo: Cool, so if I have 20 partitions and 3
consumers in failover mode, that works out to be 60 consumers but only 20
active?
----
2019-03-20 18:27:15 UTC - Matteo Merli: correct, each consumer tries to be
“active” on all the partitions, then brokers decide
----
2019-03-20 18:27:34 UTC - Ryan Samo: Nice! Thanks @Matteo Merli
----
2019-03-20 18:27:47 UTC - Matteo Merli: (if you use Shared, it will be
round-robin on each partition)
----
2019-03-20 18:28:12 UTC - Matteo Merli: and exclusive will mean: “I want to be
the exclusive consumer on all the partitions, or fail”
----
2019-03-20 18:29:17 UTC - Ryan Samo: Ok, makes perfect sense now thanks
----
2019-03-20 18:41:55 UTC - Sree Vaddi:
<https://www.meetup.com/SF-Bay-ACM/events/259921891/>
+1 : Jerry Peng, David Kjerrumgaard, Ali Ahmed, Shivji Kumar Jha
----
2019-03-20 19:21:38 UTC - Thor Sigurjonsson: @Matteo Merli We're just running
functions worker out of the broker in the default way.
----
2019-03-20 19:22:28 UTC - Thor Sigurjonsson: in this case we're running the
kafka source using the nar with the release.
----
2019-03-20 19:22:51 UTC - Thor Sigurjonsson: We've got kerberized kafka to talk
to.
----
2019-03-20 19:29:17 UTC - Matteo Merli: @Thor Sigurjonsson I think that by
default is using ProcessRuntime (and it’s not setting the System properties on
the JVM).
You can change that to use ThreadRuntime. See :
```
threadContainerFactory:
threadGroupName: "Thread Function Container Group"
```
on `conf/functions_worker.yml`
----
2019-03-20 19:29:35 UTC - Matteo Merli: (and you can comment out the
`processContainerFactory` section in there.
----
2019-03-20 19:29:57 UTC - Matteo Merli: Then you can just set
`-Djava.security.krb5.conf` on the broker/worker JVM process
----
2019-03-20 20:00:36 UTC - Sam Leung: Hi we are doing some capacity planning,
and looking at
<https://github.com/apache/pulsar/wiki/PIP-8:-Pulsar-beyond-1M-topics>, it
states some limits that I didn’t know about (are they documented somewhere?) so
I was wondering if there are recommended limits on things like topics per
namespace (can I have 100k? 1M? What would be the limiting factor?), total peer
groups (can we reasonably support 10M topics with 10 peer clusters?)
----
2019-03-20 20:05:49 UTC - Ali Ahmed: @sam.huang what’s your use case for
suporting 10 M topics ?
----
2019-03-20 20:16:24 UTC - Shaun Noonan: I'm working with Sam on this project as
well. I think the question of 10M is more an exercise in understanding the
constraints. We do have some use cases in the 10-100k topics range. It would
be helpful to understand how to reason about how we could get there and beyond.
----
2019-03-20 20:16:48 UTC - vinay Parekar: @Matteo Merli is there any behavioural
difference when using threadContainerFactory rather than
processContainerFactory?
----
2019-03-20 20:17:52 UTC - Matteo Merli: Function instances are started as
threads inside the worker/broker JVM process, rather than as a process on their
own
----
2019-03-20 20:24:36 UTC - Shaun Noonan: It's helpful to understand what goes
into getting to those 100k=1M numbers. for example, how does 1000 namespaces x
100 topics compare to 100 namespaces x 1000 topics. Beyond what we can load
test, good to know what we might run into at those sorts of numbers.
----
2019-03-20 20:27:47 UTC - Ali Ahmed: @vinay Parekar the threaded mode is mostly
for cases where one needs a deployment optimized for low memory or latency,
process isolation is nearly always better
----
2019-03-20 20:28:54 UTC - Sanjeev Kulkarni: @vinay Parekar I’ll put up a pr
soon to add the ability to pass on those flags. in the meanwhile try using
thread mode
+1 : Thor Sigurjonsson
----
2019-03-20 20:29:58 UTC - vinay Parekar: sure @Ali Ahmed @Sanjeev Kulkarni
thanks!
----
2019-03-20 20:41:48 UTC - Sanjeev Kulkarni: @Sijie Guo the integration tests
for functions have become a lot flakier after the recent avro changes. Here is
a stack trace for one such failure, running in threaded mode
----
2019-03-20 20:45:31 UTC - Sanjeev Kulkarni: ‘currently schema detection only
works for topics with avro schemas’
----
2019-03-20 21:04:48 UTC - Joe Francis: @Shaun Noonan @Sam Leung Pulsar metadata
is stored in ZK. You will reach the limits of ZK in terms of how much ZK data
can be, and how quickly the cluster can come up from a Cold start.
----
2019-03-20 21:13:49 UTC - Sam Leung: @Joe Francis The ZK limit makes sense. Is
there a difference in how performant ZK would be if we defined 1 namespace with
1M topics, vs 1000 namespaces with 1000 topics?
Also can you elaborate on what the limit on Cold start?
----
2019-03-20 21:26:55 UTC - Mark Marijnissen: Figured it out - a misconfiguration
of the broker. I reduced the bookies to two, but didn't lower these values:
managedLedgerDefaultAckQuorum: 1
managedLedgerDefaultEnsembleSize: 1
managedLedgerDefaultWriteQuorum: 1
I'll have to read the architecture/docs to get a proper understanding of these
parameters, and understand how I can get a minimum yet production-safe (i.e.
with failovers and backup) configuration
----
2019-03-20 21:27:22 UTC - Matteo Merli: :+1:
----
2019-03-20 21:44:27 UTC - Chris Camplejohn: @Chris Camplejohn has joined the
channel
----
2019-03-20 21:57:41 UTC - Joe Francis: I would not set up a NS with 1M topics,
and I cant speak to what the limit is there. I would examine an use case which
require that set up more closely. NS are an unit of adminstration- policy
management monitoring and stats are at a NS level.
----
2019-03-20 21:59:51 UTC - Joe Francis: As for cold start - any large system
when it comes up, will stress the system metadata store, and your rate of
startup will depend on how long it takes to serve all the metadata required by
system startup. You will be limited by the limits of ZK
----
2019-03-21 00:58:36 UTC - Sijie Guo: @Sanjeev Kulkarni
> ‘currently schema detection only works for topics with avro schemas’
this is AUTO_CONSUME schema. this change has been there since months. It means
it is trying to consume a topic that is not avro.
I don’t think it is related to any recent changes to avro schema. since the
logic is out side of avro schema itself.
----
2019-03-21 00:59:42 UTC - Guy Feldman: anyone have recommendations for running
pulsar in production?
----
2019-03-21 01:00:23 UTC - Guy Feldman: configs to look out for? vms or
kubernetes?
----
2019-03-21 01:45:04 UTC - Shivji Kumar Jha: Yes i could achieve my goal using
the recent multi version schema provider for now. I will wait for the
AUTO_CONSUME wiring change :blush:
----
2019-03-21 01:46:23 UTC - Shivji Kumar Jha: Is the missing task assigned to
someone already in your team btw or can i contribute that? :grinning:
----
2019-03-21 02:28:00 UTC - bossbaby: With Message deduplication, Is pulsar
removing it based on messageid?
----
2019-03-21 02:28:25 UTC - Matteo Merli: It’s based on producer name and
sequence id
----
2019-03-21 02:28:55 UTC - Sijie Guo: @Shivji Kumar Jha I think @Penghui Li and
@CongBo from <http://zhaopin.com|zhaopin.com> is working on this part.
Let me ping Penghui offline to see where are they.
----
2019-03-21 02:29:22 UTC - Matteo Merli:
<https://www.slideshare.net/merlimat/effectivelyonce-semantics-in-apache-pulsar>
----
2019-03-21 02:36:12 UTC - bossbaby: So, does deduplication work on cursor
pulsar.repl?
----
2019-03-21 02:36:29 UTC - Matteo Merli: yes, it works on the replicator
producers
----
2019-03-21 02:36:38 UTC - Matteo Merli: (if enabled on the namespace policies)
----
2019-03-21 02:51:35 UTC - Sijie Guo: Talked with @Penghui Li and @CongBo. I
think @CongBo is working on this part.
----
2019-03-21 05:10:09 UTC - bossbaby: In the 3 cluster model (A, B, C). In case
if the data in B is deleted, is the cursor translation of cluster A and C
original to copy the data to ensure the order of packets is the same as the
replication from A and C?
Example:
A send: A1
C send: C1
A send: A2
C send: C2
Before reset cursor, result consume in cluster C: A1 C1 A2 C2
After reset cursor successfull and consumer consume message with expected
results:
A1 C1 A2 C2
----